X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=httpinput.cpp;h=19167f0b6b7404df6592dc9e9e2805bd9424325d;hp=49501831fa7df2805eead2a172b5dab5d77a3320;hb=46115862ece2cbd590bc4ae6fe06767869fa7668;hpb=71fc5575037bead8b6e927a1fffd199e4fc4514b diff --git a/httpinput.cpp b/httpinput.cpp index 4950183..19167f0 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include "parse.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" #include "version.h" using namespace std; @@ -60,13 +62,8 @@ HTTPInput::HTTPInput(const InputProto &serialized) void HTTPInput::close_socket() { - int ret; - do { - ret = close(sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close()"); + if (sock != -1) { + safe_close(sock); } } @@ -89,42 +86,73 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) { addrinfo *ai; int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai); - if (err == -1) { + if (err != 0) { log(WARNING, "[%s] Lookup of '%s' failed (%s).", url.c_str(), host.c_str(), gai_strerror(err)); - freeaddrinfo(ai); return -1; } addrinfo *base_ai = ai; // Connect to everything in turn until we have a socket. - while (ai && !should_stop) { + for ( ; ai && !should_stop(); ai = ai->ai_next) { int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); if (sock == -1) { // Could be e.g. EPROTONOSUPPORT. The show must go on. continue; } + // Now do a non-blocking connect. This is important because we want to be able to be + // woken up, even though it's rather cumbersome. + + // Set the socket as nonblocking. + int one = 1; + if (ioctl(sock, FIONBIO, &one) == -1) { + log_perror("ioctl(FIONBIO)"); + safe_close(sock); + return -1; + } + + // Do a non-blocking connect. do { err = connect(sock, ai->ai_addr, ai->ai_addrlen); } while (err == -1 && errno == EINTR); - if (err != -1) { - freeaddrinfo(base_ai); - return sock; + if (err == -1 && errno != EINPROGRESS) { + log_perror("connect"); + safe_close(sock); + continue; } - do { - err = close(sock); - } while (err == -1 && errno == EINTR); + // Wait for the connect to complete, or an error to happen. + for ( ;; ) { + bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL); + if (should_stop()) { + safe_close(sock); + return -1; + } + if (complete) { + break; + } + } - if (err == -1) { - log_perror("close"); - // Can still continue. + // Check whether it ended in an error or not. + socklen_t err_size = sizeof(err); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) { + log_perror("getsockopt"); + safe_close(sock); + continue; } - ai = ai->ai_next; + errno = err; + + if (err == 0) { + // Successful connect. + freeaddrinfo(base_ai); + return sock; + } + + safe_close(sock); } // Give the last one as error. @@ -212,8 +240,8 @@ bool HTTPInput::parse_response(const std::string &request) http_header.append(it->first + ": " + it->second + "\r\n"); } - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header, ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, ""); } return true; @@ -221,23 +249,13 @@ bool HTTPInput::parse_response(const std::string &request) void HTTPInput::do_work() { - while (!should_stop) { + while (!should_stop()) { if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; - pfd.events |= POLLRDHUP; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL); + if (!activity) { + // Most likely, should_stop was set. continue; } - if (nfds == -1) { - log_perror("poll"); - state = CLOSING_SOCKET; - } } switch (state) { @@ -246,8 +264,8 @@ void HTTPInput::do_work() request_bytes_sent = 0; response.clear(); pending_data.clear(); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], "", ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], "", ""); } { @@ -367,7 +385,7 @@ void HTTPInput::do_work() if (ret == 0) { // This really shouldn't happen... - log(ERROR, "[%s] Socket unexpectedly closed while reading header", + log(ERROR, "[%s] Socket unexpectedly closed while reading data", url.c_str()); state = CLOSING_SOCKET; continue; @@ -377,15 +395,7 @@ void HTTPInput::do_work() break; } case CLOSING_SOCKET: { - int err; - do { - err = close(sock); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - log_perror("close"); - } - + close_socket(); state = NOT_CONNECTED; break; } @@ -396,9 +406,12 @@ void HTTPInput::do_work() // If we are still in NOT_CONNECTED, either something went wrong, // or the connection just got closed. // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop) { + if (state == NOT_CONNECTED && !should_stop()) { log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str()); - usleep(200000); + timespec timeout_ts; + timeout_ts.tv_sec = 0; + timeout_ts.tv_nsec = 200000000; + wait_for_wakeup(&timeout_ts); } } } @@ -453,12 +466,12 @@ void HTTPInput::process_data(char *ptr, size_t bytes) char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header, header); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, header); } } else { - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->add_data(stream_ids[i], inner_data, size); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->add_data(stream_indices[i], inner_data, size); } }