X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=httpinput.cpp;h=c30651ed3e5c42100a7f5cc0d16f56e8b7c7751f;hp=51b4f412582bb94d512b074249d5375f94804bfb;hb=9abb89bcf7940e2ada9d708f86a218a56334f68d;hpb=340489a8e732519182ecbc92116e7dfa2997143c diff --git a/httpinput.cpp b/httpinput.cpp index 51b4f41..c30651e 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -1,10 +1,10 @@ +#include #include #include #include #include #include #include -#include #include #include #include @@ -21,6 +21,7 @@ #include "parse.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" #include "version.h" using namespace std; @@ -50,18 +51,18 @@ HTTPInput::HTTPInput(const InputProto &serialized) string protocol; parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails. + + // Older versions stored the extra \r\n in the HTTP header. + // Strip it if we find it. + if (http_header.size() >= 4 && + memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) { + http_header.resize(http_header.size() - 2); + } } void HTTPInput::close_socket() { - int ret; - do { - ret = close(sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close()"); - } + safe_close(sock); } InputProto HTTPInput::serialize() const @@ -93,31 +94,64 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) addrinfo *base_ai = ai; // Connect to everything in turn until we have a socket. - while (ai && !should_stop) { + while (ai && !should_stop()) { int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); if (sock == -1) { // Could be e.g. EPROTONOSUPPORT. The show must go on. continue; } + // Now do a non-blocking connect. This is important because we want to be able to be + // woken up, even though it's rather cumbersome. + + // Set the socket as nonblocking. + int one = 1; + if (ioctl(sock, FIONBIO, &one) == -1) { + log_perror("ioctl(FIONBIO)"); + safe_close(sock); + return -1; + } + + // Do a non-blocking connect. do { err = connect(sock, ai->ai_addr, ai->ai_addrlen); } while (err == -1 && errno == EINTR); - if (err != -1) { - freeaddrinfo(base_ai); - return sock; + if (err == -1 && errno != EINPROGRESS) { + log_perror("connect"); + safe_close(sock); + continue; } - do { - err = close(sock); - } while (err == -1 && errno == EINTR); + // Wait for the connect to complete, or an error to happen. + for ( ;; ) { + bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL); + if (should_stop()) { + safe_close(sock); + return -1; + } + if (complete) { + break; + } + } + + // Check whether it ended in an error or not. + socklen_t err_size = sizeof(err); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) { + log_perror("getsockopt"); + safe_close(sock); + continue; + } - if (err == -1) { - log_perror("close"); - // Can still continue. + errno = err; + + if (err == 0) { + // Successful connect. + freeaddrinfo(base_ai); + return sock; } + safe_close(sock); ai = ai->ai_next; } @@ -193,6 +227,11 @@ bool HTTPInput::parse_response(const std::string &request) } } + // Set “Connection: close”. + // TODO: Make case-insensitive. + parameters.erase("Connection"); + parameters.insert(make_pair("Connection", "close")); + // Construct the new HTTP header. http_header = "HTTP/1.0 200 OK\r\n"; for (multimap::iterator it = parameters.begin(); @@ -200,10 +239,9 @@ bool HTTPInput::parse_response(const std::string &request) ++it) { http_header.append(it->first + ": " + it->second + "\r\n"); } - http_header.append("\r\n"); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, ""); } return true; @@ -211,23 +249,13 @@ bool HTTPInput::parse_response(const std::string &request) void HTTPInput::do_work() { - while (!should_stop) { + while (!should_stop()) { if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; - pfd.events |= POLLRDHUP; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL); + if (!activity) { + // Most likely, should_stop was set. continue; } - if (nfds == -1) { - log_perror("poll"); - state = CLOSING_SOCKET; - } } switch (state) { @@ -236,8 +264,8 @@ void HTTPInput::do_work() request_bytes_sent = 0; response.clear(); pending_data.clear(); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], "", ""); } { @@ -367,15 +395,7 @@ void HTTPInput::do_work() break; } case CLOSING_SOCKET: { - int err; - do { - err = close(sock); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - log_perror("close"); - } - + close_socket(); state = NOT_CONNECTED; break; } @@ -386,9 +406,12 @@ void HTTPInput::do_work() // If we are still in NOT_CONNECTED, either something went wrong, // or the connection just got closed. // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop) { + if (state == NOT_CONNECTED && !should_stop()) { log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str()); - usleep(200000); + timespec timeout_ts; + timeout_ts.tv_sec = 0; + timeout_ts.tv_nsec = 200000000; + wait_for_wakeup(&timeout_ts); } } } @@ -443,12 +466,12 @@ void HTTPInput::process_data(char *ptr, size_t bytes) char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header + header); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, header); } } else { - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->add_data(stream_ids[i], inner_data, size); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->add_data(stream_indices[i], inner_data, size); } }