X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=httpinput.cpp;h=c30651ed3e5c42100a7f5cc0d16f56e8b7c7751f;hp=da672edc3689a14cb4707f29f060d9db73183bad;hb=9abb89bcf7940e2ada9d708f86a218a56334f68d;hpb=b9f87872a42f32818805f3c2520555f2d6e2928d diff --git a/httpinput.cpp b/httpinput.cpp index da672ed..c30651e 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -1,10 +1,10 @@ +#include #include #include #include #include #include #include -#include #include #include #include @@ -21,6 +21,7 @@ #include "parse.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" #include "version.h" using namespace std; @@ -61,14 +62,7 @@ HTTPInput::HTTPInput(const InputProto &serialized) void HTTPInput::close_socket() { - int ret; - do { - ret = close(sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close()"); - } + safe_close(sock); } InputProto HTTPInput::serialize() const @@ -100,31 +94,64 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) addrinfo *base_ai = ai; // Connect to everything in turn until we have a socket. - while (ai && !should_stop) { + while (ai && !should_stop()) { int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); if (sock == -1) { // Could be e.g. EPROTONOSUPPORT. The show must go on. continue; } + // Now do a non-blocking connect. This is important because we want to be able to be + // woken up, even though it's rather cumbersome. + + // Set the socket as nonblocking. + int one = 1; + if (ioctl(sock, FIONBIO, &one) == -1) { + log_perror("ioctl(FIONBIO)"); + safe_close(sock); + return -1; + } + + // Do a non-blocking connect. do { err = connect(sock, ai->ai_addr, ai->ai_addrlen); } while (err == -1 && errno == EINTR); - if (err != -1) { - freeaddrinfo(base_ai); - return sock; + if (err == -1 && errno != EINPROGRESS) { + log_perror("connect"); + safe_close(sock); + continue; } - do { - err = close(sock); - } while (err == -1 && errno == EINTR); + // Wait for the connect to complete, or an error to happen. + for ( ;; ) { + bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL); + if (should_stop()) { + safe_close(sock); + return -1; + } + if (complete) { + break; + } + } + + // Check whether it ended in an error or not. + socklen_t err_size = sizeof(err); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) { + log_perror("getsockopt"); + safe_close(sock); + continue; + } - if (err == -1) { - log_perror("close"); - // Can still continue. + errno = err; + + if (err == 0) { + // Successful connect. + freeaddrinfo(base_ai); + return sock; } + safe_close(sock); ai = ai->ai_next; } @@ -213,8 +240,8 @@ bool HTTPInput::parse_response(const std::string &request) http_header.append(it->first + ": " + it->second + "\r\n"); } - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header, ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, ""); } return true; @@ -222,23 +249,13 @@ bool HTTPInput::parse_response(const std::string &request) void HTTPInput::do_work() { - while (!should_stop) { + while (!should_stop()) { if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; - pfd.events |= POLLRDHUP; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL); + if (!activity) { + // Most likely, should_stop was set. continue; } - if (nfds == -1) { - log_perror("poll"); - state = CLOSING_SOCKET; - } } switch (state) { @@ -247,8 +264,8 @@ void HTTPInput::do_work() request_bytes_sent = 0; response.clear(); pending_data.clear(); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], "", ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], "", ""); } { @@ -378,15 +395,7 @@ void HTTPInput::do_work() break; } case CLOSING_SOCKET: { - int err; - do { - err = close(sock); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - log_perror("close"); - } - + close_socket(); state = NOT_CONNECTED; break; } @@ -397,9 +406,12 @@ void HTTPInput::do_work() // If we are still in NOT_CONNECTED, either something went wrong, // or the connection just got closed. // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop) { + if (state == NOT_CONNECTED && !should_stop()) { log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str()); - usleep(200000); + timespec timeout_ts; + timeout_ts.tv_sec = 0; + timeout_ts.tv_nsec = 200000000; + wait_for_wakeup(&timeout_ts); } } } @@ -454,12 +466,12 @@ void HTTPInput::process_data(char *ptr, size_t bytes) char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header, header); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, header); } } else { - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->add_data(stream_ids[i], inner_data, size); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->add_data(stream_indices[i], inner_data, size); } }