From: Steinar H. Gunderson Date: Mon, 8 Apr 2013 22:16:41 +0000 (+0200) Subject: Write our own HTTP client instead of using curl. Not finished yet (missing URL parsin... X-Git-Tag: 1.0.0~172 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=c2c9f6441f9ae8091a39aea0340417d5915e1ac9;ds=sidebyside Write our own HTTP client instead of using curl. Not finished yet (missing URL parsing, for one). --- diff --git a/Makefile b/Makefile index ca9b255..6cd07cf 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC=gcc CXX=g++ PROTOC=protoc CXXFLAGS=-Wall -O2 -g -LDLIBS=-lcurl -lpthread -lprotobuf +LDLIBS=-lpthread -lprotobuf OBJS=cubemap.o server.o serverpool.o mutexlock.o input.o parse.o state.pb.o diff --git a/cubemap.cpp b/cubemap.cpp index a52ee76..b471afb 100644 --- a/cubemap.cpp +++ b/cubemap.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -305,7 +304,7 @@ int main(int argc, char **argv) // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. for (size_t i = 0; i < inputs.size(); ++i) { inputs[i]->stop(); - delete inputs[i]; // TODO: serialize instead of using libcurl. + delete inputs[i]; // TODO: Serialize. } if (pthread_join(acceptor_thread, NULL) == -1) { diff --git a/input.cpp b/input.cpp index de7252a..401aa6a 100644 --- a/input.cpp +++ b/input.cpp @@ -3,12 +3,14 @@ #include #include #include -#include #include #include #include #include -#include +#include +#include +#include +#include #include #include #include @@ -19,15 +21,22 @@ #include "input.h" #include "server.h" #include "serverpool.h" +#include "parse.h" using namespace std; extern ServerPool *servers; Input::Input(const string &stream_id, const string &url) - : stream_id(stream_id), + : state(NOT_CONNECTED), + stream_id(stream_id), url(url), - has_metacube_header(false) + // TODO + host("gruessi.zrh.sesse.net"), + port("4013"), + path("/test.flv"), + has_metacube_header(false), + sock(-1) { } @@ -59,35 +68,210 @@ void *Input::do_work_thunk(void *arg) return NULL; } -void Input::do_work() +int Input::lookup_and_connect(const string &host, const string &port) { - CURL *curl = curl_easy_init(); + addrinfo *ai; + int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai); + if (err == -1) { + fprintf(stderr, "WARNING: Lookup of '%s' failed (%s).\n", + host.c_str(), gai_strerror(err)); + freeaddrinfo(ai); + return -1; + } - while (!should_stop) { - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); - curl_easy_perform(curl); - if (!should_stop) { - printf("Transfer of '%s' ended, waiting 0.2 seconds and restarting...\n", url.c_str()); - usleep(200000); + // Connect to everything in turn until we have a socket. + while (ai && !should_stop) { + int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); + if (sock == -1) { + // Could be e.g. EPROTONOSUPPORT. The show must go on. + continue; + } + + do { + err = connect(sock, ai->ai_addr, ai->ai_addrlen); + } while (err == -1 && errno == EINTR); + + if (err != -1) { + freeaddrinfo(ai); + return sock; } + + ai = ai->ai_next; } + + // Give the last one as error. + fprintf(stderr, "WARNING: Connect to '%s' failed (%s)\n", + host.c_str(), strerror(errno)); + freeaddrinfo(ai); + return -1; } -size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata) +void Input::do_work() { - Input *input = static_cast(userdata); - if (input->should_stop) { - return 0; - } + while (!should_stop) { + if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { + // Since we are non-blocking, we need to wait for the right state first. + // Wait up to 50 ms, then check should_stop. + pollfd pfd; + pfd.fd = sock; + pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; + pfd.events |= POLLRDHUP; - size_t bytes = size * nmemb; - input->curl_callback(ptr, bytes); - return bytes; -} + int nfds = poll(&pfd, 1, 50); + if (nfds == 0 || (nfds == -1 && errno == EAGAIN)) { + continue; + } + if (nfds == -1) { + perror("poll"); + state = CLOSING_SOCKET; + } + } + + switch (state) { + case NOT_CONNECTED: + request.clear(); + request_bytes_sent = 0; + response.clear(); + + sock = lookup_and_connect(host, port); + if (sock != -1) { + // Yay, successful connect. Try to set it as nonblocking. + int one = 1; + if (ioctl(sock, FIONBIO, &one) == -1) { + perror("ioctl(FIONBIO)"); + state = CLOSING_SOCKET; + } else { + state = SENDING_REQUEST; + request = "GET " + path + " HTTP/1.0\r\nUser-Agent: cubemap\r\n\r\n"; + request_bytes_sent = 0; + } + } + break; + case SENDING_REQUEST: { + size_t to_send = request.size() - request_bytes_sent; + int ret; + + do { + ret = write(sock, request.data() + request_bytes_sent, to_send); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("write"); + state = CLOSING_SOCKET; + continue; + } + + assert(ret >= 0); + request_bytes_sent += ret; + + if (request_bytes_sent == request.size()) { + state = RECEIVING_HEADER; + } + break; + } + case RECEIVING_HEADER: { + char buf[4096]; + int ret; + + do { + ret = read(sock, buf, sizeof(buf)); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("read"); + state = CLOSING_SOCKET; + continue; + } + + if (ret == 0) { + // This really shouldn't happen... + fprintf(stderr, "Socket unexpectedly closed while reading header\n"); + state = CLOSING_SOCKET; + continue; + } + + RequestParseStatus status = wait_for_double_newline(&response, buf, ret); + + if (status == RP_OUT_OF_SPACE) { + fprintf(stderr, "WARNING: fd %d sent overlong response!\n", sock); + state = CLOSING_SOCKET; + continue; + } else if (status == RP_NOT_FINISHED_YET) { + continue; + } -void Input::curl_callback(char *ptr, size_t bytes) + // OK, so we're fine, but there might be some of the actual data after the response. + // We'll need to deal with that separately. + string extra_data; + if (status == RP_EXTRA_DATA) { + char *ptr = static_cast( + memmem(response.data(), response.size(), "\r\n\r\n", 4)); + assert(ptr != NULL); + extra_data = string(ptr, &response[0] + response.size()); + response.resize(ptr - response.data()); + } + + // TODO: Check that the response is 200, save the headers, etc. + + if (!extra_data.empty()) { + process_data(&extra_data[0], extra_data.size()); + } + + state = RECEIVING_DATA; + break; + } + case RECEIVING_DATA: { + char buf[4096]; + int ret; + + do { + ret = read(sock, buf, sizeof(buf)); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("read"); + state = CLOSING_SOCKET; + continue; + } + + if (ret == 0) { + // This really shouldn't happen... + fprintf(stderr, "Socket unexpectedly closed while reading header\n"); + state = CLOSING_SOCKET; + continue; + } + + process_data(buf, ret); + break; + } + case CLOSING_SOCKET: { + int err; + do { + err = close(sock); + } while (err == -1 && errno == EINTR); + + if (err == -1) { + perror("close"); + } + + state = NOT_CONNECTED; + break; + } + default: + assert(false); + } + + // If we are still in NOT_CONNECTED, either something went wrong, + // or the connection just got closed. + // The earlier steps have already given the error message, if any. + if (state == NOT_CONNECTED && !should_stop) { + fprintf(stderr, "Waiting 0.2 second and restarting...\n"); + usleep(200000); + } + } +} + +void Input::process_data(char *ptr, size_t bytes) { pending_data.insert(pending_data.end(), ptr, ptr + bytes); @@ -133,25 +317,22 @@ void Input::curl_callback(char *ptr, size_t bytes) return; } - process_block(pending_data.data() + sizeof(metacube_block_header), size, flags); + // Send this block on to the data. + char *inner_data = pending_data.data() + sizeof(metacube_block_header); + if (flags & METACUBE_FLAGS_HEADER) { + string header(inner_data, inner_data + size); + servers->set_header(stream_id, header); + } else { + servers->add_data(stream_id, inner_data, size); + } - // Consume this block. This isn't the most efficient way of dealing with things + // Consume the block. This isn't the most efficient way of dealing with things // should we have many blocks, but these routines don't need to be too efficient // anyway. pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size); has_metacube_header = false; } } - -void Input::process_block(const char *data, uint32_t size, uint32_t flags) -{ - if (flags & METACUBE_FLAGS_HEADER) { - string header(data, data + size); - servers->set_header(stream_id, header); - } else { - servers->add_data(stream_id, data, size); - } -} void Input::drop_pending_data(size_t num_bytes) { diff --git a/input.h b/input.h index 508eb59..8a26ce6 100644 --- a/input.h +++ b/input.h @@ -11,7 +11,7 @@ public: // Connect to the given URL and start streaming. void run(); - // Stop streaming. NOTE: Does not currently work! + // Stops the streaming, but lets the file descriptor stay open. void stop(); private: @@ -20,21 +20,43 @@ private: // Actually does the download. void do_work(); - - // Recovers the this pointer and calls curl_callback(). - static size_t curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata); + + // Open a socket that connects to the given host and port. Does DNS resolving. + int lookup_and_connect(const std::string &host, const std::string &port); // Stores the given data, looks for Metacube blocks (skipping data if needed), // and calls process_block() for each one. - void curl_callback(char *ptr, size_t bytes); - void process_block(const char *data, uint32_t size, uint32_t flags); + void process_data(char *ptr, size_t bytes); // Drops bytes from the head of , // and outputs a warning. void drop_pending_data(size_t num_bytes); + enum State { + NOT_CONNECTED, + SENDING_REQUEST, + RECEIVING_HEADER, + RECEIVING_DATA, + CLOSING_SOCKET, // Due to error. + }; + State state; + std::string stream_id; + + // The URL and its parsed components. std::string url; + std::string host, port, path; + + // The HTTP request, with headers and all. + // Only relevant for SENDING_REQUEST. + std::string request; + + // How many bytes we've sent of the request so far. + // Only relevant for SENDING_REQUEST. + size_t request_bytes_sent; + + // The HTTP response we've received so far. Only relevant for RECEIVING_HEADER. + std::string response; // Data we have received but not fully processed yet. std::vector pending_data; @@ -43,6 +65,10 @@ private: // this is true. bool has_metacube_header; + // The socket we are downloading on (or -1). + int sock; + + // Handle to the thread that actually does the download. pthread_t worker_thread; // Whether we should stop or not. diff --git a/parse.cpp b/parse.cpp index 0747afb..fca90a6 100644 --- a/parse.cpp +++ b/parse.cpp @@ -141,3 +141,30 @@ int fetch_config_int(const vector &config, const string &keyword, in } return value; } + +#define MAX_REQUEST_SIZE 16384 /* 16 kB. */ + +RequestParseStatus wait_for_double_newline(string *existing_data, const char *new_data, size_t new_data_size) +{ + // Guard against overlong requests gobbling up all of our space. + if (existing_data->size() + new_data_size > MAX_REQUEST_SIZE) { + return RP_OUT_OF_SPACE; + } + + // See if we have \r\n\r\n anywhere in the request. We start three bytes + // before what we just appended, in case we just got the final character. + size_t existing_data_bytes = existing_data->size(); + existing_data->append(string(new_data, new_data + new_data_size)); + + const size_t start_at = (existing_data_bytes >= 3 ? existing_data_bytes - 3 : 0); + const char *ptr = reinterpret_cast( + memmem(existing_data->data() + start_at, existing_data->size() - start_at, + "\r\n\r\n", 4)); + if (ptr == NULL) { + return RP_NOT_FINISHED_YET; + } + if (ptr != existing_data->data() + existing_data->size() - 4) { + return RP_EXTRA_DATA; + } + return RP_FINISHED; +} diff --git a/parse.h b/parse.h index 83c5d2a..5d56bd1 100644 --- a/parse.h +++ b/parse.h @@ -25,4 +25,19 @@ std::vector parse_config(const std::string &filename); // Note: Limits are inclusive. int fetch_config_int(const std::vector &config, const std::string &keyword, int min_limit, int max_limit); +// Add the new data to an existing string, looking for \r\n\r\n +// (typical of HTTP requests and/or responses). Will return one +// of the given statuses. +// +// Note that if you give too much data in new_data_size, you could +// get an RP_OUT_OF_SPACE even if you expected RP_EXTRA_DATA. +// Be careful about how large reads you give in. +enum RequestParseStatus { + RP_OUT_OF_SPACE, // If larger than 16 kB. + RP_NOT_FINISHED_YET, // Did not get \r\n\r\n yet. + RP_EXTRA_DATA, // Got \r\n\r\n, but there was extra data behind it. + RP_FINISHED, // Ended exactly in \r\n\r\n. +}; +RequestParseStatus wait_for_double_newline(std::string *existing_data, const char *new_data, size_t new_data_size); + #endif // !defined(_PARSE_H) diff --git a/server.cpp b/server.cpp index 60836a7..f06685a 100644 --- a/server.cpp +++ b/server.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -362,34 +361,27 @@ read_request_again: return; } - // Guard against overlong requests gobbling up all of our space. - if (client->request.size() + ret > MAX_CLIENT_REQUEST) { + RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret); + + switch (status) { + case RP_OUT_OF_SPACE: fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock); close_client(client); return; - } - - // See if we have \r\n\r\n anywhere in the request. We start three bytes - // before what we just appended, in case we just got the final character. - size_t existing_req_bytes = client->request.size(); - client->request.append(string(buf, buf + ret)); - - size_t start_at = (existing_req_bytes >= 3 ? existing_req_bytes - 3 : 0); - const char *ptr = reinterpret_cast( - memmem(client->request.data() + start_at, client->request.size() - start_at, - "\r\n\r\n", 4)); - if (ptr == NULL) { + case RP_NOT_FINISHED_YET: // OK, we don't have the entire header yet. Fine; we'll get it later. // See if there's more data for us. goto read_request_again; - } - - if (ptr != client->request.data() + client->request.size() - 4) { + case RP_EXTRA_DATA: fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock); close_client(client); return; + case RP_FINISHED: + break; } + assert(status == RP_FINISHED); + int error_code = parse_request(client); if (error_code == 200) { construct_header(client);