#include <stdint.h>
#include <assert.h>
#include <arpa/inet.h>
-#include <curl/curl.h>
#include <sys/socket.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/ioctl.h>
-#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <poll.h>
#include <errno.h>
#include <vector>
#include <string>
#include <map>
#include "metacube.h"
+#include "mutexlock.h"
#include "input.h"
#include "server.h"
+#include "serverpool.h"
+#include "parse.h"
using namespace std;
-extern Server *servers;
+extern ServerPool *servers;
+
+// Extremely rudimentary URL parsing.
+bool parse_url(const string &url, string *host, string *port, string *path)
+{
+ if (url.find("http://") != 0) {
+ return false;
+ }
+
+ string rest = url.substr(strlen("http://"));
+ size_t split = rest.find_first_of(":/");
+ if (split == string::npos) {
+ // http://foo
+ *host = rest;
+ *port = "http";
+ *path = "/";
+ return true;
+ }
-Input::Input(const string &stream_id)
- : stream_id(stream_id),
- has_metacube_header(false)
+ *host = string(rest.begin(), rest.begin() + split);
+ char ch = rest[split]; // Colon or slash.
+ rest = string(rest.begin() + split + 1, rest.end());
+
+ if (ch == ':') {
+ // Parse the port.
+ split = rest.find_first_of('/');
+ if (split == string::npos) {
+ // http://foo:1234
+ *port = rest;
+ *path = "/";
+ return true;
+ } else {
+ // http://foo:1234/bar
+ *port = string(rest.begin(), rest.begin() + split);
+ *path = string(rest.begin() + split, rest.end());
+ return true;
+ }
+ }
+
+ // http://foo/bar
+ *port = "http";
+ *path = rest;
+ return true;
+}
+
+Input::Input(const string &stream_id, const string &url)
+ : state(NOT_CONNECTED),
+ stream_id(stream_id),
+ url(url),
+ has_metacube_header(false),
+ sock(-1)
+{
+}
+
+void Input::run()
+{
+ should_stop = false;
+
+ // Joinable is already the default, but it's good to be certain.
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ pthread_create(&worker_thread, &attr, Input::do_work_thunk, this);
+}
+
+void Input::stop()
{
+ should_stop = true;
+
+ if (pthread_join(worker_thread, NULL) == -1) {
+ perror("pthread_join");
+ exit(1);
+ }
}
-void Input::run(const string &url)
+void *Input::do_work_thunk(void *arg)
{
- CURL *curl = curl_easy_init();
- curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
- curl_easy_perform(curl);
+ Input *input = static_cast<Input *>(arg);
+ input->do_work();
+ return NULL;
}
-size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata)
+int Input::lookup_and_connect(const string &host, const string &port)
{
- Input *input = static_cast<Input *>(userdata);
- size_t bytes = size * nmemb;
- input->curl_callback(ptr, bytes);
- return bytes;
+ addrinfo *ai;
+ int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
+ if (err == -1) {
+ fprintf(stderr, "WARNING: Lookup of '%s' failed (%s).\n",
+ host.c_str(), gai_strerror(err));
+ freeaddrinfo(ai);
+ return -1;
+ }
+
+ // Connect to everything in turn until we have a socket.
+ while (ai && !should_stop) {
+ int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
+ if (sock == -1) {
+ // Could be e.g. EPROTONOSUPPORT. The show must go on.
+ continue;
+ }
+
+ do {
+ err = connect(sock, ai->ai_addr, ai->ai_addrlen);
+ } while (err == -1 && errno == EINTR);
+
+ if (err != -1) {
+ freeaddrinfo(ai);
+ return sock;
+ }
+
+ ai = ai->ai_next;
+ }
+
+ // Give the last one as error.
+ fprintf(stderr, "WARNING: Connect to '%s' failed (%s)\n",
+ host.c_str(), strerror(errno));
+ freeaddrinfo(ai);
+ return -1;
}
+
+void Input::do_work()
+{
+ while (!should_stop) {
+ if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
+ // Since we are non-blocking, we need to wait for the right state first.
+ // Wait up to 50 ms, then check should_stop.
+ pollfd pfd;
+ pfd.fd = sock;
+ pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
+ pfd.events |= POLLRDHUP;
+
+ int nfds = poll(&pfd, 1, 50);
+ if (nfds == 0 || (nfds == -1 && errno == EAGAIN)) {
+ continue;
+ }
+ if (nfds == -1) {
+ perror("poll");
+ state = CLOSING_SOCKET;
+ }
+ }
+
+ switch (state) {
+ case NOT_CONNECTED:
+ request.clear();
+ request_bytes_sent = 0;
+ response.clear();
-void Input::curl_callback(char *ptr, size_t bytes)
+ if (!parse_url(url, &host, &port, &path)) {
+ fprintf(stderr, "Failed to parse URL '%s'\n", url.c_str());
+ break;
+ }
+
+ sock = lookup_and_connect(host, port);
+ if (sock != -1) {
+ // Yay, successful connect. Try to set it as nonblocking.
+ int one = 1;
+ if (ioctl(sock, FIONBIO, &one) == -1) {
+ perror("ioctl(FIONBIO)");
+ state = CLOSING_SOCKET;
+ } else {
+ state = SENDING_REQUEST;
+ request = "GET " + path + " HTTP/1.0\r\nUser-Agent: cubemap\r\n\r\n";
+ request_bytes_sent = 0;
+ }
+ }
+ break;
+ case SENDING_REQUEST: {
+ size_t to_send = request.size() - request_bytes_sent;
+ int ret;
+
+ do {
+ ret = write(sock, request.data() + request_bytes_sent, to_send);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ perror("write");
+ state = CLOSING_SOCKET;
+ continue;
+ }
+
+ assert(ret >= 0);
+ request_bytes_sent += ret;
+
+ if (request_bytes_sent == request.size()) {
+ state = RECEIVING_HEADER;
+ }
+ break;
+ }
+ case RECEIVING_HEADER: {
+ char buf[4096];
+ int ret;
+
+ do {
+ ret = read(sock, buf, sizeof(buf));
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ perror("read");
+ state = CLOSING_SOCKET;
+ continue;
+ }
+
+ if (ret == 0) {
+ // This really shouldn't happen...
+ fprintf(stderr, "Socket unexpectedly closed while reading header\n");
+ state = CLOSING_SOCKET;
+ continue;
+ }
+
+ RequestParseStatus status = wait_for_double_newline(&response, buf, ret);
+
+ if (status == RP_OUT_OF_SPACE) {
+ fprintf(stderr, "WARNING: fd %d sent overlong response!\n", sock);
+ state = CLOSING_SOCKET;
+ continue;
+ } else if (status == RP_NOT_FINISHED_YET) {
+ continue;
+ }
+
+ // OK, so we're fine, but there might be some of the actual data after the response.
+ // We'll need to deal with that separately.
+ string extra_data;
+ if (status == RP_EXTRA_DATA) {
+ char *ptr = static_cast<char *>(
+ memmem(response.data(), response.size(), "\r\n\r\n", 4));
+ assert(ptr != NULL);
+ extra_data = string(ptr, &response[0] + response.size());
+ response.resize(ptr - response.data());
+ }
+
+ // TODO: Check that the response is 200, save the headers, etc.
+
+ if (!extra_data.empty()) {
+ process_data(&extra_data[0], extra_data.size());
+ }
+
+ state = RECEIVING_DATA;
+ break;
+ }
+ case RECEIVING_DATA: {
+ char buf[4096];
+ int ret;
+
+ do {
+ ret = read(sock, buf, sizeof(buf));
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ perror("read");
+ state = CLOSING_SOCKET;
+ continue;
+ }
+
+ if (ret == 0) {
+ // This really shouldn't happen...
+ fprintf(stderr, "Socket unexpectedly closed while reading header\n");
+ state = CLOSING_SOCKET;
+ continue;
+ }
+
+ process_data(buf, ret);
+ break;
+ }
+ case CLOSING_SOCKET: {
+ int err;
+ do {
+ err = close(sock);
+ } while (err == -1 && errno == EINTR);
+
+ if (err == -1) {
+ perror("close");
+ }
+
+ state = NOT_CONNECTED;
+ break;
+ }
+ default:
+ assert(false);
+ }
+
+ // If we are still in NOT_CONNECTED, either something went wrong,
+ // or the connection just got closed.
+ // The earlier steps have already given the error message, if any.
+ if (state == NOT_CONNECTED && !should_stop) {
+ fprintf(stderr, "Waiting 0.2 second and restarting...\n");
+ usleep(200000);
+ }
+ }
+}
+
+void Input::process_data(char *ptr, size_t bytes)
{
pending_data.insert(pending_data.end(), ptr, ptr + bytes);
return;
}
- process_block(pending_data.data(), size, flags);
+ // Send this block on to the data.
+ char *inner_data = pending_data.data() + sizeof(metacube_block_header);
+ if (flags & METACUBE_FLAGS_HEADER) {
+ string header(inner_data, inner_data + size);
+ servers->set_header(stream_id, header);
+ } else {
+ servers->add_data(stream_id, inner_data, size);
+ }
- // Consume this block. This isn't the most efficient way of dealing with things
+ // Consume the block. This isn't the most efficient way of dealing with things
// should we have many blocks, but these routines don't need to be too efficient
// anyway.
pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
- }
-}
-
-void Input::process_block(const char *data, uint32_t size, uint32_t flags)
-{
- if (flags & METACUBE_FLAGS_HEADER) {
- string header(data, data + size);
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].set_header(stream_id, header);
- }
- } else {
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].add_data(stream_id, data, size);
- }
+ has_metacube_header = false;
}
}