X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=input.cpp;h=f5d8d5ef19ff8dced7f5d75a73b5a724b2942a82;hp=58d319f10428d74435bd8d79fb35817785f63eb1;hb=7aec1a69c2726488fcc6c9d6fc25f2e413329738;hpb=e20ad47985bdda71b7b58c26932dad9a3a50c066 diff --git a/input.cpp b/input.cpp index 58d319f..f5d8d5e 100644 --- a/input.cpp +++ b/input.cpp @@ -1,51 +1,74 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include "metacube.h" -#include "mutexlock.h" +#include "httpinput.h" #include "input.h" -#include "server.h" -#include "serverpool.h" -#include "parse.h" #include "state.pb.h" +#include "udpinput.h" using namespace std; -extern ServerPool *servers; - +namespace { + +// Does not support passwords, only user:host, since this is really only used +// to parse VLC's udp://source@multicastgroup:1234/ syntax (we do not support +// even basic auth). +void split_user_host(const string &user_host, string *user, string *host) +{ + size_t split = user_host.find("@"); + if (split == string::npos) { + user->clear(); + *host = user_host; + } else { + *user = string(user_host.begin(), user_host.begin() + split); + *host = string(user_host.begin() + split + 1, user_host.end()); + } +} + +} // namespace + // Extremely rudimentary URL parsing. -bool parse_url(const string &url, string *host, string *port, string *path) +bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path) { - if (url.find("http://") != 0) { + size_t split = url.find("://"); + if (split == string::npos) { return false; } - - string rest = url.substr(strlen("http://")); - size_t split = rest.find_first_of(":/"); - if (split == string::npos) { + *protocol = string(url.begin(), url.begin() + split); + + string rest = string(url.begin() + split + 3, url.end()); + + // Split at the first slash, or the first colon that's not within []. + bool within_brackets = false; + for (split = 0; split < rest.size(); ++split) { + if (rest[split] == '[') { + if (within_brackets) { + // Can't nest brackets. + return false; + } + within_brackets = true; + } else if (rest[split] == ']') { + if (!within_brackets) { + // ] without matching [. + return false; + } + within_brackets = false; + } else if (rest[split] == '/') { + break; + } else if (rest[split] == ':' && !within_brackets) { + break; + } + } + + if (split == rest.size()) { // http://foo - *host = rest; - *port = "http"; + split_user_host(rest, user, host); + *port = *protocol; *path = "/"; return true; } - *host = string(rest.begin(), rest.begin() + split); + split_user_host(string(rest.begin(), rest.begin() + split), user, host); char ch = rest[split]; // Colon or slash. rest = string(rest.begin() + split + 1, rest.end()); @@ -66,357 +89,40 @@ bool parse_url(const string &url, string *host, string *port, string *path) } // http://foo/bar - *port = "http"; - *path = rest; + *port = *protocol; + *path = "/" + rest; return true; } -Input::Input(const string &stream_id, const string &url) - : state(NOT_CONNECTED), - stream_id(stream_id), - url(url), - has_metacube_header(false), - sock(-1) -{ -} - -Input::Input(const InputProto &serialized) - : state(State(serialized.state())), - stream_id(serialized.stream_id()), - url(serialized.url()), - request(serialized.request()), - request_bytes_sent(serialized.request_bytes_sent()), - response(serialized.response()), - has_metacube_header(serialized.has_metacube_header()), - sock(serialized.sock()) -{ - pending_data.resize(serialized.pending_data().size()); - memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); - - parse_url(url, &host, &port, &path); // Don't care if it fails. -} - -InputProto Input::serialize() const +Input *create_input(const std::string &url) { - InputProto serialized; - serialized.set_state(state); - serialized.set_stream_id(stream_id); - serialized.set_url(url); - serialized.set_request(request); - serialized.set_request_bytes_sent(request_bytes_sent); - serialized.set_response(response); - serialized.set_pending_data(string(pending_data.begin(), pending_data.end())); - serialized.set_has_metacube_header(has_metacube_header); - serialized.set_sock(sock); - return serialized; -} - -void Input::run() -{ - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Input::do_work_thunk, this); -} - -void Input::stop() -{ - should_stop = true; - - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); + string protocol, user, host, port, path; + if (!parse_url(url, &protocol, &user, &host, &port, &path)) { + return NULL; } -} - -void *Input::do_work_thunk(void *arg) -{ - Input *input = static_cast(arg); - input->do_work(); - return NULL; -} - -int Input::lookup_and_connect(const string &host, const string &port) -{ - addrinfo *ai; - int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai); - if (err == -1) { - fprintf(stderr, "WARNING: Lookup of '%s' failed (%s).\n", - host.c_str(), gai_strerror(err)); - freeaddrinfo(ai); - return -1; + if (protocol == "http") { + return new HTTPInput(url); } - - // Connect to everything in turn until we have a socket. - while (ai && !should_stop) { - int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); - if (sock == -1) { - // Could be e.g. EPROTONOSUPPORT. The show must go on. - continue; - } - - do { - err = connect(sock, ai->ai_addr, ai->ai_addrlen); - } while (err == -1 && errno == EINTR); - - if (err != -1) { - freeaddrinfo(ai); - return sock; - } - - ai = ai->ai_next; + if (protocol == "udp") { + return new UDPInput(url); } - - // Give the last one as error. - fprintf(stderr, "WARNING: Connect to '%s' failed (%s)\n", - host.c_str(), strerror(errno)); - freeaddrinfo(ai); - return -1; + return NULL; } -void Input::do_work() +Input *create_input(const InputProto &serialized) { - while (!should_stop) { - if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; - pfd.events |= POLLRDHUP; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EAGAIN)) { - continue; - } - if (nfds == -1) { - perror("poll"); - state = CLOSING_SOCKET; - } - } - - switch (state) { - case NOT_CONNECTED: - request.clear(); - request_bytes_sent = 0; - response.clear(); - - if (!parse_url(url, &host, &port, &path)) { - fprintf(stderr, "Failed to parse URL '%s'\n", url.c_str()); - break; - } - - sock = lookup_and_connect(host, port); - if (sock != -1) { - // Yay, successful connect. Try to set it as nonblocking. - int one = 1; - if (ioctl(sock, FIONBIO, &one) == -1) { - perror("ioctl(FIONBIO)"); - state = CLOSING_SOCKET; - } else { - state = SENDING_REQUEST; - request = "GET " + path + " HTTP/1.0\r\nUser-Agent: cubemap\r\n\r\n"; - request_bytes_sent = 0; - } - } - break; - case SENDING_REQUEST: { - size_t to_send = request.size() - request_bytes_sent; - int ret; - - do { - ret = write(sock, request.data() + request_bytes_sent, to_send); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - perror("write"); - state = CLOSING_SOCKET; - continue; - } - - assert(ret >= 0); - request_bytes_sent += ret; - - if (request_bytes_sent == request.size()) { - state = RECEIVING_HEADER; - } - break; - } - case RECEIVING_HEADER: { - char buf[4096]; - int ret; - - do { - ret = read(sock, buf, sizeof(buf)); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - perror("read"); - state = CLOSING_SOCKET; - continue; - } - - if (ret == 0) { - // This really shouldn't happen... - fprintf(stderr, "Socket unexpectedly closed while reading header\n"); - state = CLOSING_SOCKET; - continue; - } - - RequestParseStatus status = wait_for_double_newline(&response, buf, ret); - - if (status == RP_OUT_OF_SPACE) { - fprintf(stderr, "WARNING: fd %d sent overlong response!\n", sock); - state = CLOSING_SOCKET; - continue; - } else if (status == RP_NOT_FINISHED_YET) { - continue; - } - - // OK, so we're fine, but there might be some of the actual data after the response. - // We'll need to deal with that separately. - string extra_data; - if (status == RP_EXTRA_DATA) { - char *ptr = static_cast( - memmem(response.data(), response.size(), "\r\n\r\n", 4)); - assert(ptr != NULL); - extra_data = string(ptr, &response[0] + response.size()); - response.resize(ptr - response.data()); - } - - // TODO: Check that the response is 200, save the headers, etc. - - if (!extra_data.empty()) { - process_data(&extra_data[0], extra_data.size()); - } - - state = RECEIVING_DATA; - break; - } - case RECEIVING_DATA: { - char buf[4096]; - int ret; - - do { - ret = read(sock, buf, sizeof(buf)); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - perror("read"); - state = CLOSING_SOCKET; - continue; - } - - if (ret == 0) { - // This really shouldn't happen... - fprintf(stderr, "Socket unexpectedly closed while reading header\n"); - state = CLOSING_SOCKET; - continue; - } - - process_data(buf, ret); - break; - } - case CLOSING_SOCKET: { - int err; - do { - err = close(sock); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - perror("close"); - } - - state = NOT_CONNECTED; - break; - } - default: - assert(false); - } - - // If we are still in NOT_CONNECTED, either something went wrong, - // or the connection just got closed. - // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop) { - fprintf(stderr, "Waiting 0.2 second and restarting...\n"); - usleep(200000); - } + string protocol, user, host, port, path; + if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) { + return NULL; } -} - -void Input::process_data(char *ptr, size_t bytes) -{ - pending_data.insert(pending_data.end(), ptr, ptr + bytes); - - for ( ;; ) { - // If we don't have enough data (yet) for even the Metacube header, just return. - if (pending_data.size() < sizeof(metacube_block_header)) { - return; - } - - // Make sure we have the Metacube sync header at the start. - // We may need to skip over junk data (it _should_ not happen, though). - if (!has_metacube_header) { - char *ptr = static_cast( - memmem(pending_data.data(), pending_data.size(), - METACUBE_SYNC, strlen(METACUBE_SYNC))); - if (ptr == NULL) { - // OK, so we didn't find the sync marker. We know then that - // we do not have the _full_ marker in the buffer, but we - // could have N-1 bytes. Drop everything before that, - // and then give up. - drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1)); - return; - } else { - // Yay, we found the header. Drop everything (if anything) before it. - drop_pending_data(ptr - pending_data.data()); - has_metacube_header = true; - - // Re-check that we have the entire header; we could have dropped data. - if (pending_data.size() < sizeof(metacube_block_header)) { - return; - } - } - } - - // Now it's safe to read the header. - metacube_block_header *hdr = reinterpret_cast(pending_data.data()); - assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0); - uint32_t size = ntohl(hdr->size); - uint32_t flags = ntohl(hdr->flags); - - // See if we have the entire block. If not, wait for more data. - if (pending_data.size() < sizeof(metacube_block_header) + size) { - return; - } - - // Send this block on to the data. - char *inner_data = pending_data.data() + sizeof(metacube_block_header); - if (flags & METACUBE_FLAGS_HEADER) { - string header(inner_data, inner_data + size); - servers->set_header(stream_id, header); - } else { - servers->add_data(stream_id, inner_data, size); - } - - // Consume the block. This isn't the most efficient way of dealing with things - // should we have many blocks, but these routines don't need to be too efficient - // anyway. - pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size); - has_metacube_header = false; + if (protocol == "http") { + return new HTTPInput(serialized); } -} - -void Input::drop_pending_data(size_t num_bytes) -{ - if (num_bytes == 0) { - return; + if (protocol == "udp") { + return new UDPInput(serialized); } - fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n", - (long long)num_bytes); - pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes); + return NULL; } +Input::~Input() {} +