X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=input.cpp;h=7c776e124f0cfa2627b20a4760964ad9f4af246a;hp=401aa6a366f8e555a8adeb3bf2e7761551e50762;hb=9bb20b7dd0bcea9de1daf4cac29263d74924ce5a;hpb=c2c9f6441f9ae8091a39aea0340417d5915e1ac9 diff --git a/input.cpp b/input.cpp index 401aa6a..7c776e1 100644 --- a/input.cpp +++ b/input.cpp @@ -1,346 +1,87 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include "metacube.h" -#include "mutexlock.h" +#include "httpinput.h" #include "input.h" -#include "server.h" -#include "serverpool.h" -#include "parse.h" +#include "state.pb.h" +#include "udpinput.h" using namespace std; -extern ServerPool *servers; - -Input::Input(const string &stream_id, const string &url) - : state(NOT_CONNECTED), - stream_id(stream_id), - url(url), - // TODO - host("gruessi.zrh.sesse.net"), - port("4013"), - path("/test.flv"), - has_metacube_header(false), - sock(-1) +// Extremely rudimentary URL parsing. +bool parse_url(const string &url, string *protocol, string *host, string *port, string *path) { -} - -void Input::run() -{ - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Input::do_work_thunk, this); -} - -void Input::stop() -{ - should_stop = true; - - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); + size_t split = url.find("://"); + if (split == string::npos) { + return false; } -} - -void *Input::do_work_thunk(void *arg) -{ - Input *input = static_cast(arg); - input->do_work(); - return NULL; -} - -int Input::lookup_and_connect(const string &host, const string &port) -{ - addrinfo *ai; - int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai); - if (err == -1) { - fprintf(stderr, "WARNING: Lookup of '%s' failed (%s).\n", - host.c_str(), gai_strerror(err)); - freeaddrinfo(ai); - return -1; + *protocol = string(url.begin(), url.begin() + split); + + string rest = string(url.begin() + split + 3, url.end()); + split = rest.find_first_of(":/"); + if (split == string::npos) { + // http://foo + *host = rest; + *port = *protocol; + *path = "/"; + return true; } - // Connect to everything in turn until we have a socket. - while (ai && !should_stop) { - int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); - if (sock == -1) { - // Could be e.g. EPROTONOSUPPORT. The show must go on. - continue; + *host = string(rest.begin(), rest.begin() + split); + char ch = rest[split]; // Colon or slash. + rest = string(rest.begin() + split + 1, rest.end()); + + if (ch == ':') { + // Parse the port. + split = rest.find_first_of('/'); + if (split == string::npos) { + // http://foo:1234 + *port = rest; + *path = "/"; + return true; + } else { + // http://foo:1234/bar + *port = string(rest.begin(), rest.begin() + split); + *path = string(rest.begin() + split, rest.end()); + return true; } - - do { - err = connect(sock, ai->ai_addr, ai->ai_addrlen); - } while (err == -1 && errno == EINTR); - - if (err != -1) { - freeaddrinfo(ai); - return sock; - } - - ai = ai->ai_next; } - // Give the last one as error. - fprintf(stderr, "WARNING: Connect to '%s' failed (%s)\n", - host.c_str(), strerror(errno)); - freeaddrinfo(ai); - return -1; + // http://foo/bar + *port = *protocol; + *path = rest; + return true; } -void Input::do_work() +Input *create_input(const std::string &stream_id, const std::string &url) { - while (!should_stop) { - if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; - pfd.events |= POLLRDHUP; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EAGAIN)) { - continue; - } - if (nfds == -1) { - perror("poll"); - state = CLOSING_SOCKET; - } - } - - switch (state) { - case NOT_CONNECTED: - request.clear(); - request_bytes_sent = 0; - response.clear(); - - sock = lookup_and_connect(host, port); - if (sock != -1) { - // Yay, successful connect. Try to set it as nonblocking. - int one = 1; - if (ioctl(sock, FIONBIO, &one) == -1) { - perror("ioctl(FIONBIO)"); - state = CLOSING_SOCKET; - } else { - state = SENDING_REQUEST; - request = "GET " + path + " HTTP/1.0\r\nUser-Agent: cubemap\r\n\r\n"; - request_bytes_sent = 0; - } - } - break; - case SENDING_REQUEST: { - size_t to_send = request.size() - request_bytes_sent; - int ret; - - do { - ret = write(sock, request.data() + request_bytes_sent, to_send); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - perror("write"); - state = CLOSING_SOCKET; - continue; - } - - assert(ret >= 0); - request_bytes_sent += ret; - - if (request_bytes_sent == request.size()) { - state = RECEIVING_HEADER; - } - break; - } - case RECEIVING_HEADER: { - char buf[4096]; - int ret; - - do { - ret = read(sock, buf, sizeof(buf)); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - perror("read"); - state = CLOSING_SOCKET; - continue; - } - - if (ret == 0) { - // This really shouldn't happen... - fprintf(stderr, "Socket unexpectedly closed while reading header\n"); - state = CLOSING_SOCKET; - continue; - } - - RequestParseStatus status = wait_for_double_newline(&response, buf, ret); - - if (status == RP_OUT_OF_SPACE) { - fprintf(stderr, "WARNING: fd %d sent overlong response!\n", sock); - state = CLOSING_SOCKET; - continue; - } else if (status == RP_NOT_FINISHED_YET) { - continue; - } - - // OK, so we're fine, but there might be some of the actual data after the response. - // We'll need to deal with that separately. - string extra_data; - if (status == RP_EXTRA_DATA) { - char *ptr = static_cast( - memmem(response.data(), response.size(), "\r\n\r\n", 4)); - assert(ptr != NULL); - extra_data = string(ptr, &response[0] + response.size()); - response.resize(ptr - response.data()); - } - - // TODO: Check that the response is 200, save the headers, etc. - - if (!extra_data.empty()) { - process_data(&extra_data[0], extra_data.size()); - } - - state = RECEIVING_DATA; - break; - } - case RECEIVING_DATA: { - char buf[4096]; - int ret; - - do { - ret = read(sock, buf, sizeof(buf)); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - perror("read"); - state = CLOSING_SOCKET; - continue; - } - - if (ret == 0) { - // This really shouldn't happen... - fprintf(stderr, "Socket unexpectedly closed while reading header\n"); - state = CLOSING_SOCKET; - continue; - } - - process_data(buf, ret); - break; - } - case CLOSING_SOCKET: { - int err; - do { - err = close(sock); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - perror("close"); - } - - state = NOT_CONNECTED; - break; - } - default: - assert(false); - } - - // If we are still in NOT_CONNECTED, either something went wrong, - // or the connection just got closed. - // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop) { - fprintf(stderr, "Waiting 0.2 second and restarting...\n"); - usleep(200000); - } + string protocol, host, port, path; + if (!parse_url(url, &protocol, &host, &port, &path)) { + return NULL; } -} - -void Input::process_data(char *ptr, size_t bytes) -{ - pending_data.insert(pending_data.end(), ptr, ptr + bytes); - - for ( ;; ) { - // If we don't have enough data (yet) for even the Metacube header, just return. - if (pending_data.size() < sizeof(metacube_block_header)) { - return; - } - - // Make sure we have the Metacube sync header at the start. - // We may need to skip over junk data (it _should_ not happen, though). - if (!has_metacube_header) { - char *ptr = static_cast( - memmem(pending_data.data(), pending_data.size(), - METACUBE_SYNC, strlen(METACUBE_SYNC))); - if (ptr == NULL) { - // OK, so we didn't find the sync marker. We know then that - // we do not have the _full_ marker in the buffer, but we - // could have N-1 bytes. Drop everything before that, - // and then give up. - drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1)); - return; - } else { - // Yay, we found the header. Drop everything (if anything) before it. - drop_pending_data(ptr - pending_data.data()); - has_metacube_header = true; - - // Re-check that we have the entire header; we could have dropped data. - if (pending_data.size() < sizeof(metacube_block_header)) { - return; - } - } - } - - // Now it's safe to read the header. - metacube_block_header *hdr = reinterpret_cast(pending_data.data()); - assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0); - uint32_t size = ntohl(hdr->size); - uint32_t flags = ntohl(hdr->flags); - - // See if we have the entire block. If not, wait for more data. - if (pending_data.size() < sizeof(metacube_block_header) + size) { - return; - } - - // Send this block on to the data. - char *inner_data = pending_data.data() + sizeof(metacube_block_header); - if (flags & METACUBE_FLAGS_HEADER) { - string header(inner_data, inner_data + size); - servers->set_header(stream_id, header); - } else { - servers->add_data(stream_id, inner_data, size); - } - - // Consume the block. This isn't the most efficient way of dealing with things - // should we have many blocks, but these routines don't need to be too efficient - // anyway. - pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size); - has_metacube_header = false; + if (protocol == "http") { + return new HTTPInput(stream_id, url); } + if (protocol == "udp") { + return new UDPInput(stream_id, url); + } + return NULL; } -void Input::drop_pending_data(size_t num_bytes) +Input *create_input(const InputProto &serialized) { - if (num_bytes == 0) { - return; + string protocol, host, port, path; + if (!parse_url(serialized.url(), &protocol, &host, &port, &path)) { + return NULL; } - fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n", - (long long)num_bytes); - pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes); + if (protocol == "http") { + return new HTTPInput(serialized); + } + if (protocol == "udp") { + return new UDPInput(serialized); + } + return NULL; } +Input::~Input() {} +