X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=input.cpp;h=7c776e124f0cfa2627b20a4760964ad9f4af246a;hp=f9f879b6c2a393f6671b42fb25e4462854e04711;hb=0d72f384a1de672824298262ba5c427ec0aee2d6;hpb=6d062eab70fd6528219545846e6c19c1cad35a3d diff --git a/input.cpp b/input.cpp index f9f879b..7c776e1 100644 --- a/input.cpp +++ b/input.cpp @@ -1,160 +1,87 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include "metacube.h" -#include "mutexlock.h" +#include "httpinput.h" #include "input.h" -#include "server.h" +#include "state.pb.h" +#include "udpinput.h" using namespace std; -extern Server *servers; - -Input::Input(const string &stream_id, const string &url) - : stream_id(stream_id), - url(url), - has_metacube_header(false) -{ -} - -void Input::run() -{ - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Input::do_work_thunk, this); -} - -void Input::stop() +// Extremely rudimentary URL parsing. +bool parse_url(const string &url, string *protocol, string *host, string *port, string *path) { - should_stop = true; - - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); + size_t split = url.find("://"); + if (split == string::npos) { + return false; + } + *protocol = string(url.begin(), url.begin() + split); + + string rest = string(url.begin() + split + 3, url.end()); + split = rest.find_first_of(":/"); + if (split == string::npos) { + // http://foo + *host = rest; + *port = *protocol; + *path = "/"; + return true; } -} -void *Input::do_work_thunk(void *arg) -{ - Input *input = static_cast(arg); - input->do_work(); - return NULL; -} + *host = string(rest.begin(), rest.begin() + split); + char ch = rest[split]; // Colon or slash. + rest = string(rest.begin() + split + 1, rest.end()); + + if (ch == ':') { + // Parse the port. + split = rest.find_first_of('/'); + if (split == string::npos) { + // http://foo:1234 + *port = rest; + *path = "/"; + return true; + } else { + // http://foo:1234/bar + *port = string(rest.begin(), rest.begin() + split); + *path = string(rest.begin() + split, rest.end()); + return true; + } + } -void Input::do_work() -{ - CURL *curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); - curl_easy_perform(curl); + // http://foo/bar + *port = *protocol; + *path = rest; + return true; } -size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata) +Input *create_input(const std::string &stream_id, const std::string &url) { - Input *input = static_cast(userdata); - if (input->should_stop) { - return 0; + string protocol, host, port, path; + if (!parse_url(url, &protocol, &host, &port, &path)) { + return NULL; } - - size_t bytes = size * nmemb; - input->curl_callback(ptr, bytes); - return bytes; -} - -void Input::curl_callback(char *ptr, size_t bytes) -{ - pending_data.insert(pending_data.end(), ptr, ptr + bytes); - - for ( ;; ) { - // If we don't have enough data (yet) for even the Metacube header, just return. - if (pending_data.size() < sizeof(metacube_block_header)) { - return; - } - - // Make sure we have the Metacube sync header at the start. - // We may need to skip over junk data (it _should_ not happen, though). - if (!has_metacube_header) { - char *ptr = static_cast( - memmem(pending_data.data(), pending_data.size(), - METACUBE_SYNC, strlen(METACUBE_SYNC))); - if (ptr == NULL) { - // OK, so we didn't find the sync marker. We know then that - // we do not have the _full_ marker in the buffer, but we - // could have N-1 bytes. Drop everything before that, - // and then give up. - drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1)); - return; - } else { - // Yay, we found the header. Drop everything (if anything) before it. - drop_pending_data(ptr - pending_data.data()); - has_metacube_header = true; - - // Re-check that we have the entire header; we could have dropped data. - if (pending_data.size() < sizeof(metacube_block_header)) { - return; - } - } - } - - // Now it's safe to read the header. - metacube_block_header *hdr = reinterpret_cast(pending_data.data()); - assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0); - uint32_t size = ntohl(hdr->size); - uint32_t flags = ntohl(hdr->flags); - - // See if we have the entire block. If not, wait for more data. - if (pending_data.size() < sizeof(metacube_block_header) + size) { - return; - } - - process_block(pending_data.data() + sizeof(metacube_block_header), size, flags); - - // Consume this block. This isn't the most efficient way of dealing with things - // should we have many blocks, but these routines don't need to be too efficient - // anyway. - pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size); + if (protocol == "http") { + return new HTTPInput(stream_id, url); } -} - -void Input::process_block(const char *data, uint32_t size, uint32_t flags) -{ - if (flags & METACUBE_FLAGS_HEADER) { - string header(data, data + size); - for (int i = 0; i < NUM_SERVERS; ++i) { - servers[i].set_header(stream_id, header); - } - } else { - for (int i = 0; i < NUM_SERVERS; ++i) { - servers[i].add_data(stream_id, data, size); - } + if (protocol == "udp") { + return new UDPInput(stream_id, url); } + return NULL; } -void Input::drop_pending_data(size_t num_bytes) +Input *create_input(const InputProto &serialized) { - if (num_bytes == 0) { - return; + string protocol, host, port, path; + if (!parse_url(serialized.url(), &protocol, &host, &port, &path)) { + return NULL; } - fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n", - (long long)num_bytes); - pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes); + if (protocol == "http") { + return new HTTPInput(serialized); + } + if (protocol == "udp") { + return new UDPInput(serialized); + } + return NULL; } +Input::~Input() {} +