#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <time.h>
#include <unistd.h>
#include <map>
#include <string>
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
+#include "util.h"
#include "version.h"
using namespace std;
void HTTPInput::close_socket()
{
- int ret;
- do {
- ret = close(sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close()");
+ if (sock != -1) {
+ safe_close(sock);
}
}
{
addrinfo *ai;
int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
- if (err == -1) {
+ if (err != 0) {
log(WARNING, "[%s] Lookup of '%s' failed (%s).",
url.c_str(), host.c_str(), gai_strerror(err));
- freeaddrinfo(ai);
return -1;
}
addrinfo *base_ai = ai;
// Connect to everything in turn until we have a socket.
- while (ai && !should_stop) {
+ for ( ; ai && !should_stop(); ai = ai->ai_next) {
int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
// Could be e.g. EPROTONOSUPPORT. The show must go on.
continue;
}
+ // Now do a non-blocking connect. This is important because we want to be able to be
+ // woken up, even though it's rather cumbersome.
+
+ // Set the socket as nonblocking.
+ int one = 1;
+ if (ioctl(sock, FIONBIO, &one) == -1) {
+ log_perror("ioctl(FIONBIO)");
+ safe_close(sock);
+ return -1;
+ }
+
+ // Do a non-blocking connect.
do {
err = connect(sock, ai->ai_addr, ai->ai_addrlen);
} while (err == -1 && errno == EINTR);
- if (err != -1) {
- freeaddrinfo(base_ai);
- return sock;
+ if (err == -1 && errno != EINPROGRESS) {
+ log_perror("connect");
+ safe_close(sock);
+ continue;
}
- do {
- err = close(sock);
- } while (err == -1 && errno == EINTR);
+ // Wait for the connect to complete, or an error to happen.
+ for ( ;; ) {
+ bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
+ if (should_stop()) {
+ safe_close(sock);
+ return -1;
+ }
+ if (complete) {
+ break;
+ }
+ }
- if (err == -1) {
- log_perror("close");
- // Can still continue.
+ // Check whether it ended in an error or not.
+ socklen_t err_size = sizeof(err);
+ if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
+ log_perror("getsockopt");
+ safe_close(sock);
+ continue;
}
- ai = ai->ai_next;
+ errno = err;
+
+ if (err == 0) {
+ // Successful connect.
+ freeaddrinfo(base_ai);
+ return sock;
+ }
+
+ safe_close(sock);
}
// Give the last one as error.
http_header.append(it->first + ": " + it->second + "\r\n");
}
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header, "");
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, "");
}
return true;
void HTTPInput::do_work()
{
- while (!should_stop) {
+ while (!should_stop()) {
if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check should_stop.
- pollfd pfd;
- pfd.fd = sock;
- pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
- pfd.events |= POLLRDHUP;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
+ if (!activity) {
+ // Most likely, should_stop was set.
continue;
}
- if (nfds == -1) {
- log_perror("poll");
- state = CLOSING_SOCKET;
- }
}
switch (state) {
request_bytes_sent = 0;
response.clear();
pending_data.clear();
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], "", "");
+ has_metacube_header = false;
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], "", "");
}
{
if (ret == 0) {
// This really shouldn't happen...
- log(ERROR, "[%s] Socket unexpectedly closed while reading header",
+ log(ERROR, "[%s] Socket unexpectedly closed while reading data",
url.c_str());
state = CLOSING_SOCKET;
continue;
break;
}
case CLOSING_SOCKET: {
- int err;
- do {
- err = close(sock);
- } while (err == -1 && errno == EINTR);
-
- if (err == -1) {
- log_perror("close");
- }
-
+ close_socket();
state = NOT_CONNECTED;
break;
}
// If we are still in NOT_CONNECTED, either something went wrong,
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
- if (state == NOT_CONNECTED && !should_stop) {
+ if (state == NOT_CONNECTED && !should_stop()) {
log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
- usleep(200000);
+ timespec timeout_ts;
+ timeout_ts.tv_sec = 0;
+ timeout_ts.tv_nsec = 200000000;
+ wait_for_wakeup(&timeout_ts);
}
}
}
uint32_t size = ntohl(hdr->size);
uint32_t flags = ntohl(hdr->flags);
+ if (size > 65535) {
+ log(WARNING, "[%s] Metacube block of %x bytes (flags=%x); corrupted header?",
+ url.c_str(), size, flags);
+ }
+
// See if we have the entire block. If not, wait for more data.
if (pending_data.size() < sizeof(metacube_block_header) + size) {
return;
char *inner_data = pending_data.data() + sizeof(metacube_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
string header(inner_data, inner_data + size);
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header, header);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, header);
}
} else {
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->add_data(stream_ids[i], inner_data, size);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->add_data(stream_indices[i], inner_data, size);
}
}
}
log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?",
url.c_str(), (long long)num_bytes);
+ assert(pending_data.size() >= num_bytes);
pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
}