]> git.sesse.net Git - cubemap/blobdiff - input.cpp
Add suppor for raw (non-Metacube) inputs over HTTP. Only really useful for TS.
[cubemap] / input.cpp
index fd71d4072fa64fbd30eb0d584b9b0f0217f01b2a..25012a6bbad930f63a53914ed53fd0e9d8cc4fcf 100644 (file)
--- a/input.cpp
+++ b/input.cpp
-#include <stdio.h>
-#include <string.h>
-#include <stdint.h>
-#include <assert.h>
-#include <arpa/inet.h>
-#include <curl/curl.h>
-#include <sys/socket.h>
-#include <pthread.h>
-#include <sys/types.h>
-#include <sys/ioctl.h>
-#include <sys/epoll.h>
-#include <errno.h>
-#include <vector>
+#include <stddef.h>
 #include <string>
-#include <map>
 
-#include "metacube.h"
-#include "mutexlock.h"
+#include "httpinput.h"
 #include "input.h"
-#include "server.h"
+#include "state.pb.h"
+#include "udpinput.h"
 
 using namespace std;
 
-extern Server *servers;
+namespace {
 
-Input::Input(const string &stream_id, const string &url)
-       : stream_id(stream_id),
-         url(url),
-         has_metacube_header(false)
+// Does not support passwords, only user:host, since this is really only used
+// to parse VLC's udp://source@multicastgroup:1234/ syntax (we do not support
+// even basic auth).
+void split_user_host(const string &user_host, string *user, string *host)
 {
-}
-
-void Input::run()
-{
-       should_stop = false;
-       
-       // Joinable is already the default, but it's good to be certain.
-       pthread_attr_t attr;
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-       pthread_create(&worker_thread, &attr, Input::do_work_thunk, this);
-}
-
-void Input::stop()
-{
-       should_stop = true;
-
-       if (pthread_join(worker_thread, NULL) == -1) {
-               perror("pthread_join");
-               exit(1);
+       size_t split = user_host.find("@");
+       if (split == string::npos) {
+               user->clear();
+               *host = user_host;
+       } else {
+               *user = string(user_host.begin(), user_host.begin() + split);
+               *host = string(user_host.begin() + split + 1, user_host.end());
        }
 }
 
-void *Input::do_work_thunk(void *arg)
-{
-       Input *input = static_cast<Input *>(arg);
-       input->do_work();
-       return NULL;
-}
-
-void Input::do_work()
-{
-       CURL *curl = curl_easy_init();
-
-       for ( ;; ) {
-               curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
-               curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk);
-               curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
-               curl_easy_perform(curl);
-               printf("Transfer ended, waiting 0.2 seconds and restarting...\n");
-               usleep(200000);
-       }
-}
+}  // namespace
 
-size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata)
+// Extremely rudimentary URL parsing.
+bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path)
 {
-       Input *input = static_cast<Input *>(userdata);
-       if (input->should_stop) {
-               return 0;
+       size_t split = url.find("://");
+       if (split == string::npos) {
+               return false;
        }
+       *protocol = string(url.begin(), url.begin() + split);
 
-       size_t bytes = size * nmemb;
-       input->curl_callback(ptr, bytes);       
-       return bytes;
-}
-       
-void Input::curl_callback(char *ptr, size_t bytes)
-{
-       pending_data.insert(pending_data.end(), ptr, ptr + bytes);
-
-       for ( ;; ) {
-               // If we don't have enough data (yet) for even the Metacube header, just return.
-               if (pending_data.size() < sizeof(metacube_block_header)) {
-                       return;
-               }
+       string rest = string(url.begin() + split + 3, url.end());
 
-               // Make sure we have the Metacube sync header at the start.
-               // We may need to skip over junk data (it _should_ not happen, though).
-               if (!has_metacube_header) {
-                       char *ptr = static_cast<char *>(
-                               memmem(pending_data.data(), pending_data.size(),
-                                      METACUBE_SYNC, strlen(METACUBE_SYNC)));
-                       if (ptr == NULL) {
-                               // OK, so we didn't find the sync marker. We know then that
-                               // we do not have the _full_ marker in the buffer, but we
-                               // could have N-1 bytes. Drop everything before that,
-                               // and then give up.
-                               drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1));
-                               return;
-                       } else {
-                               // Yay, we found the header. Drop everything (if anything) before it.
-                               drop_pending_data(ptr - pending_data.data());
-                               has_metacube_header = true;
-
-                               // Re-check that we have the entire header; we could have dropped data.
-                               if (pending_data.size() < sizeof(metacube_block_header)) {
-                                       return;
-                               }
+       // Split at the first slash, or the first colon that's not within [].
+       bool within_brackets = false;
+       for (split = 0; split < rest.size(); ++split) {
+               if (rest[split] == '[') {
+                       if (within_brackets) {
+                               // Can't nest brackets.
+                               return false;
                        }
+                       within_brackets = true;
+               } else if (rest[split] == ']') {
+                       if (!within_brackets) {
+                               // ] without matching [.
+                               return false;
+                       }
+                       within_brackets = false;
+               } else if (rest[split] == '/') {
+                       break;
+               } else if (rest[split] == ':' && !within_brackets) {
+                       break;
                }
+       }
 
-               // Now it's safe to read the header.
-               metacube_block_header *hdr = reinterpret_cast<metacube_block_header *>(pending_data.data());    
-               assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0);
-               uint32_t size = ntohl(hdr->size);
-               uint32_t flags = ntohl(hdr->flags);
+       if (split == rest.size()) {
+               // http://foo
+               split_user_host(rest, user, host);
+               *port = *protocol;
+               *path = "/";
+               return true;
+       }
 
-               // See if we have the entire block. If not, wait for more data.
-               if (pending_data.size() < sizeof(metacube_block_header) + size) {
-                       return;
+       split_user_host(string(rest.begin(), rest.begin() + split), user, host);
+       char ch = rest[split];  // Colon or slash.
+       rest = string(rest.begin() + split + 1, rest.end());
+
+       if (ch == ':') {
+               // Parse the port.
+               split = rest.find_first_of('/');
+               if (split == string::npos) {
+                       // http://foo:1234
+                       *port = rest;
+                       *path = "/";
+                       return true;
+               } else {
+                       // http://foo:1234/bar
+                       *port = string(rest.begin(), rest.begin() + split);
+                       *path = string(rest.begin() + split, rest.end());
+                       return true;
                }
+       }
 
-               process_block(pending_data.data() + sizeof(metacube_block_header), size, flags);
+       // http://foo/bar
+       *port = *protocol;
+       *path = "/" + rest;
+       return true;
+}
 
-               // Consume this block. This isn't the most efficient way of dealing with things
-               // should we have many blocks, but these routines don't need to be too efficient
-               // anyway.
-               pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
+Input *create_input(const string &url, Input::Encoding encoding)
+{
+       string protocol, user, host, port, path;
+       if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
+               return NULL;
        }
-}
-               
-void Input::process_block(const char *data, uint32_t size, uint32_t flags)
-{      
-       if (flags & METACUBE_FLAGS_HEADER) {
-               string header(data, data + size);
-               for (int i = 0; i < NUM_SERVERS; ++i) {
-                       servers[i].set_header(stream_id, header);
-               }
-       } else { 
-               for (int i = 0; i < NUM_SERVERS; ++i) {
-                       servers[i].add_data(stream_id, data, size);
+       if (protocol == "http") {
+               return new HTTPInput(url, encoding);
+       }
+       if (protocol == "udp") {
+               if (encoding == Input::INPUT_ENCODING_METACUBE) {
+                       return NULL;
                }
+               return new UDPInput(url);
        }
+       return NULL;
 }
 
-void Input::drop_pending_data(size_t num_bytes)
+Input *create_input(const InputProto &serialized)
 {
-       if (num_bytes == 0) {
-               return;
+       string protocol, user, host, port, path;
+       if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) {
+               return NULL;
        }
-       fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n",
-               (long long)num_bytes);
-       pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
+       if (protocol == "http") {
+               return new HTTPInput(serialized);
+       }
+       if (protocol == "udp") {
+               return new UDPInput(serialized);
+       }
+       return NULL;
 }
 
+Input::~Input() {}
+