-#include "acmp_client.h"
-
-#include <functional>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <unistd.h>
-#include <errno.h>
-#include <stdio.h>
-#include <string.h>
-#include <sys/ioctl.h>
-
-using namespace std;
-
-ACMPClient::ACMPClient(const string &host, int port)
- : host(host), port(port) {}
-
-void ACMPClient::add_init_command(const string &cmd)
-{
- init_commands.push_back(cmd + "\r\n");
-}
-
-void ACMPClient::start()
-{
- t = thread(&ACMPClient::thread_func, this);
-}
-
-void ACMPClient::end()
-{
- t.join();
-}
-
-void ACMPClient::send_command(const string &cmd)
-{
- lock_guard<mutex> lock(mu);
- queued_commands.push_back(cmd + "\r\n");
-}
-
-void ACMPClient::change_server(const string &host, int port)
-{
- lock_guard<mutex> lock(mu);
- queued_commands.push_back(""); // Marker for disconnect.
- this->host = host;
- this->port = port;
-}
-
-void ACMPClient::set_connection_callback(const std::function<void(bool)> &callback)
-{
- connection_callback = callback;
-}
-
-namespace {
-
-int lookup_and_connect(const char *host, int port)
-{
- addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
-
- addrinfo *res;
- char portstr[16];
- snprintf(portstr, sizeof(portstr), "%d", port);
- int err = getaddrinfo(host, portstr, &hints, &res);
- if (err != 0) {
- fprintf(stderr, "Lookup of %s:%d failed: %s\n", host, port, strerror(errno));
- return -1;
- }
-
- for (addrinfo *p = res; p != NULL; p = p->ai_next) {
- int sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
- if (sock == -1) {
- perror("socket");
- continue;
- }
-
- if (connect(sock, p->ai_addr, p->ai_addrlen) == -1) {
- perror("connect");
- close(sock);
- continue;
- }
-
- // Success!
- freeaddrinfo(res);
- return sock;
- }
-
- freeaddrinfo(res);
- return -1;
-}
-
-} // namespace
-
-void ACMPClient::thread_func()
-{
- if (connection_callback) {
- connection_callback(false);
- }
- for ( ;; ) {
- int sock, port_copy;
- string host_copy;
- {
- lock_guard<mutex> lock(mu);
- host_copy = host;
- port_copy = port;
- }
- sock = lookup_and_connect(host_copy.c_str(), port_copy);
- if (sock == -1) {
- sleep(1);
- continue;
- }
-
- int one = 1;
- if (ioctl(sock, FIONBIO, &one) == 1) {
- perror("ioctl(FIONBIO)");
- close(sock);
- sleep(1);
- continue;
- }
-
- printf("Connected to CasparCG.\n");
- if (connection_callback) {
- connection_callback(true);
- }
-
- bool first = true;
-
- for ( ;; ) {
- vector<string> commands;
- if (first) {
- commands = init_commands;
- first = false;
- } else {
- lock_guard<mutex> lock(mu);
- swap(commands, queued_commands);
- }
-
- bool broken = false;
- string buf;
- for (const string &cmd : commands) {
- buf += cmd;
- if (cmd.empty()) {
- printf("Closing CasparCG socket for reconnection.\n");
- broken = true;
- break;
- }
- }
-
- if (broken) {
- break;
- }
- if (!buf.empty()) {
- printf("Writing: '%s'\n", buf.c_str());
- }
-
- size_t pos = 0;
- do {
- // Consume until there is no more.
- char junk[1024];
- int err = read(sock, junk, sizeof(junk));
- if (err == -1) {
- if (err == EAGAIN) {
- perror("read");
- broken = true;
- break;
- }
- }
- if (err == 0) {
- // Closed.
- printf("Server closed connection.\n");
- broken = true;
- break;
- }
- if (err > 0) {
- // Try again.
- junk[err] = 0;
- printf("From server: '%s'\n", junk);
- continue;
- }
-
- if (pos < buf.size()) {
- // Now write as much as we can.
- err = write(sock, buf.data() + pos, buf.size() - pos);
- if (err == -1) {
- perror("write");
- broken = true;
- break;
- }
- if (err == 0) {
- // Uh-oh. Buffer full for some reason?
- usleep(10000);
- }
- pos += err;
- }
- } while (pos < buf.size());
-
- if (broken) {
- break;
- }
- if (buf.empty()) {
- usleep(100000);
- continue;
- }
- }
-
- close(sock);
- if (connection_callback) {
- connection_callback(false);
- }
- sleep(1);
- }
-}