]> git.sesse.net Git - ultimatescore/blob - client/acmp_client.cpp
Initial checkin (sans Nageru theme).
[ultimatescore] / client / acmp_client.cpp
1 #include "acmp_client.h"
2
3 #include <functional>
4
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <netdb.h>
8 #include <unistd.h>
9 #include <errno.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <sys/ioctl.h>
13
14 using namespace std;
15
16 ACMPClient::ACMPClient(const string &host, int port)
17         : host(host), port(port) {}
18
19 void ACMPClient::add_init_command(const string &cmd)
20 {
21         init_commands.push_back(cmd + "\r\n");
22 }
23
24 void ACMPClient::start()
25 {
26         t = thread(&ACMPClient::thread_func, this);
27 }
28
29 void ACMPClient::end()
30 {
31         t.join();
32 }
33
34 void ACMPClient::send_command(const string &cmd)
35 {
36         lock_guard<mutex> lock(mu);
37         queued_commands.push_back(cmd + "\r\n");
38 }
39
40 void ACMPClient::change_server(const string &host, int port)
41 {
42         lock_guard<mutex> lock(mu);
43         queued_commands.push_back("");  // Marker for disconnect.
44         this->host = host;
45         this->port = port;
46 }
47
48 void ACMPClient::set_connection_callback(const std::function<void(bool)> &callback)
49 {
50         connection_callback = callback; 
51 }
52
53 namespace {
54
55 int lookup_and_connect(const char *host, int port)
56 {
57         addrinfo hints;
58         memset(&hints, 0, sizeof(hints));
59         hints.ai_family = AF_UNSPEC;
60         hints.ai_socktype = SOCK_STREAM;
61
62         addrinfo *res;
63         char portstr[16];
64         snprintf(portstr, sizeof(portstr), "%d", port);
65         int err = getaddrinfo(host, portstr, &hints, &res);
66         if (err != 0) {
67                 fprintf(stderr, "Lookup of %s:%d failed: %s\n", host, port, strerror(errno));
68                 return -1;
69         }
70
71         for (addrinfo *p = res; p != NULL; p = p->ai_next) {
72                 int sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
73                 if (sock == -1) {
74                         perror("socket");
75                         continue;
76                 }
77
78                 if (connect(sock, p->ai_addr, p->ai_addrlen) == -1) {
79                         perror("connect");
80                         close(sock);
81                         continue;
82                 }
83
84                 // Success!
85                 freeaddrinfo(res);
86                 return sock;
87         }
88
89         freeaddrinfo(res);
90         return -1;
91 }
92
93 }  // namespace
94
95 void ACMPClient::thread_func()
96 {
97         if (connection_callback) {
98                 connection_callback(false);
99         }
100         for ( ;; ) {
101                 int sock, port_copy;
102                 string host_copy;
103                 {
104                         lock_guard<mutex> lock(mu);
105                         host_copy = host;
106                         port_copy = port;
107                 }
108                 sock = lookup_and_connect(host_copy.c_str(), port_copy);
109                 if (sock == -1) {
110                         sleep(1);
111                         continue;
112                 }
113
114                 int one = 1;
115                 if (ioctl(sock, FIONBIO, &one) == 1) {
116                         perror("ioctl(FIONBIO)");
117                         close(sock);
118                         sleep(1);
119                         continue;
120                 }
121
122                 printf("Connected to CasparCG.\n");
123                 if (connection_callback) {
124                         connection_callback(true);
125                 }
126
127                 bool first = true;
128
129                 for ( ;; ) {
130                         vector<string> commands;
131                         if (first) {
132                                 commands = init_commands;
133                                 first = false;
134                         } else {
135                                 lock_guard<mutex> lock(mu);
136                                 swap(commands, queued_commands);
137                         }
138
139                         bool broken = false;
140                         string buf;
141                         for (const string &cmd : commands) {
142                                 buf += cmd;
143                                 if (cmd.empty()) {
144                                         printf("Closing CasparCG socket for reconnection.\n");
145                                         broken = true;
146                                         break;
147                                 }
148                         }
149
150                         if (broken) {
151                                 break;
152                         }
153                         if (!buf.empty()) {
154                                 printf("Writing: '%s'\n", buf.c_str());
155                         }
156
157                         size_t pos = 0;
158                         do {
159                                 // Consume until there is no more.
160                                 char junk[1024];
161                                 int err = read(sock, junk, sizeof(junk));
162                                 if (err == -1) {
163                                         if (err == EAGAIN) {
164                                                 perror("read");
165                                                 broken = true;
166                                                 break;
167                                         }
168                                 }
169                                 if (err == 0) {
170                                         // Closed.
171                                         printf("Server closed connection.\n");
172                                         broken = true;
173                                         break;
174                                 }
175                                 if (err > 0) {
176                                         // Try again.
177                                         junk[err] = 0;
178                                         printf("From server: '%s'\n", junk);
179                                         continue;
180                                 }
181
182                                 if (pos < buf.size()) {
183                                         // Now write as much as we can.
184                                         err = write(sock, buf.data() + pos, buf.size() - pos);
185                                         if (err == -1) {
186                                                 perror("write");
187                                                 broken = true;
188                                                 break;
189                                         }
190                                         if (err == 0) {
191                                                 // Uh-oh. Buffer full for some reason?
192                                                 usleep(10000);
193                                         }
194                                         pos += err;
195                                 }
196                         } while (pos < buf.size());
197
198                         if (broken) {
199                                 break;
200                         }
201                         if (buf.empty()) {
202                                 usleep(100000);
203                                 continue;
204                         }
205                 }
206
207                 close(sock);
208                 if (connection_callback) {
209                         connection_callback(false);
210                 }
211                 sleep(1);
212         }
213 }