#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <map>
#include "httpinput.h"
#include "log.h"
-#include "metacube.h"
+#include "metacube2.h"
#include "mutexlock.h"
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
+#include "stream.h"
#include "util.h"
#include "version.h"
using namespace std;
extern ServerPool *servers;
-
+
+namespace {
+
+// Compute b-a.
+timespec clock_diff(const timespec &a, const timespec &b)
+{
+ timespec ret;
+ ret.tv_sec = b.tv_sec - a.tv_sec;
+ ret.tv_nsec = b.tv_nsec - a.tv_nsec;
+ if (ret.tv_nsec < 0) {
+ ret.tv_sec--;
+ ret.tv_nsec += 1000000000;
+ }
+ assert(ret.tv_nsec >= 0);
+ return ret;
+}
+
+} // namespace
+
HTTPInput::HTTPInput(const string &url)
: state(NOT_CONNECTED),
url(url),
request_bytes_sent(serialized.request_bytes_sent()),
response(serialized.response()),
http_header(serialized.http_header()),
+ stream_header(serialized.stream_header()),
has_metacube_header(serialized.has_metacube_header()),
sock(serialized.sock())
{
serialized.set_request_bytes_sent(request_bytes_sent);
serialized.set_response(response);
serialized.set_http_header(http_header);
+ serialized.set_stream_header(stream_header);
serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
serialized.set_has_metacube_header(has_metacube_header);
serialized.set_sock(sock);
}
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, "");
+ servers->set_header(stream_indices[i], http_header, stream_header);
}
return true;
void HTTPInput::do_work()
{
+ timespec last_activity;
+
+ // TODO: Make the timeout persist across restarts.
+ if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
+ int err = clock_gettime(CLOCK_MONOTONIC, &last_activity);
+ assert(err != -1);
+ }
+
while (!should_stop()) {
if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
- bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
- if (!activity) {
- // Most likely, should_stop was set.
+ // Give the socket 30 seconds since last activity before we time out.
+ static const int timeout_secs = 30;
+
+ timespec now;
+ int err = clock_gettime(CLOCK_MONOTONIC, &now);
+ assert(err != -1);
+
+ timespec elapsed = clock_diff(last_activity, now);
+ if (elapsed.tv_sec >= timeout_secs) {
+ // Timeout!
+ log(ERROR, "[%s] Timeout after %d seconds, closing.", url.c_str(), elapsed.tv_sec);
+ state = CLOSING_SOCKET;
+ continue;
+ }
+
+ // Basically calculate (30 - (now - last_activity)) = (30 + (last_activity - now)).
+ // Add a second of slack to account for differences between clocks.
+ timespec timeout = clock_diff(now, last_activity);
+ timeout.tv_sec += timeout_secs + 1;
+ assert(timeout.tv_sec > 0 || (timeout.tv_sec >= 0 && timeout.tv_nsec > 0));
+
+ bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, &timeout);
+ if (activity) {
+ err = clock_gettime(CLOCK_MONOTONIC, &last_activity);
+ assert(err != -1);
+ } else {
+ // OK. Most likely, should_stop was set, or we have timed out.
continue;
}
}
MutexLock lock(&stats_mutex);
stats.connect_time = time(NULL);
+ clock_gettime(CLOCK_MONOTONIC, &last_activity);
}
break;
case SENDING_REQUEST: {
for ( ;; ) {
// If we don't have enough data (yet) for even the Metacube header, just return.
- if (pending_data.size() < sizeof(metacube_block_header)) {
+ if (pending_data.size() < sizeof(metacube2_block_header)) {
return;
}
if (!has_metacube_header) {
char *ptr = static_cast<char *>(
memmem(pending_data.data(), pending_data.size(),
- METACUBE_SYNC, strlen(METACUBE_SYNC)));
+ METACUBE2_SYNC, strlen(METACUBE2_SYNC)));
if (ptr == NULL) {
// OK, so we didn't find the sync marker. We know then that
// we do not have the _full_ marker in the buffer, but we
// could have N-1 bytes. Drop everything before that,
// and then give up.
- drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1));
+ drop_pending_data(pending_data.size() - (strlen(METACUBE2_SYNC) - 1));
return;
} else {
// Yay, we found the header. Drop everything (if anything) before it.
has_metacube_header = true;
// Re-check that we have the entire header; we could have dropped data.
- if (pending_data.size() < sizeof(metacube_block_header)) {
+ if (pending_data.size() < sizeof(metacube2_block_header)) {
return;
}
}
}
// Now it's safe to read the header.
- metacube_block_header *hdr = reinterpret_cast<metacube_block_header *>(pending_data.data());
- assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0);
- uint32_t size = ntohl(hdr->size);
- uint32_t flags = ntohl(hdr->flags);
-
+ metacube2_block_header hdr;
+ memcpy(&hdr, pending_data.data(), sizeof(hdr));
+ assert(memcmp(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync)) == 0);
+ uint32_t size = ntohl(hdr.size);
+ uint16_t flags = ntohs(hdr.flags);
+ uint16_t expected_csum = metacube2_compute_crc(&hdr);
+
+ if (expected_csum != ntohs(hdr.csum)) {
+ log(WARNING, "[%s] Metacube checksum failed (expected 0x%x, got 0x%x), "
+ "not reading block claiming to be %d bytes (flags=%x).",
+ url.c_str(), expected_csum, ntohs(hdr.csum),
+ size, flags);
+
+ // Drop only the first byte, and let the rest of the code handle resync.
+ pending_data.erase(pending_data.begin(), pending_data.begin() + 1);
+ has_metacube_header = false;
+ continue;
+ }
if (size > 262144) {
log(WARNING, "[%s] Metacube block of %d bytes (flags=%x); corrupted header?",
url.c_str(), size, flags);
}
// See if we have the entire block. If not, wait for more data.
- if (pending_data.size() < sizeof(metacube_block_header) + size) {
+ if (pending_data.size() < sizeof(metacube2_block_header) + size) {
return;
}
MutexLock lock(&stats_mutex);
stats.data_bytes_received += size;
}
- char *inner_data = pending_data.data() + sizeof(metacube_block_header);
+ char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
- string header(inner_data, inner_data + size);
+ stream_header = string(inner_data, inner_data + size);
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, header);
+ servers->set_header(stream_indices[i], http_header, stream_header);
}
} else {
StreamStartSuitability suitable_for_stream_start;
// Consume the block. This isn't the most efficient way of dealing with things
// should we have many blocks, but these routines don't need to be too efficient
// anyway.
- pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
+ pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube2_block_header) + size);
has_metacube_header = false;
}
}
if (num_bytes == 0) {
return;
}
- log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?",
+ log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube2 stream?",
url.c_str(), (long long)num_bytes);
assert(pending_data.size() >= num_bytes);
pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
void HTTPInput::add_destination(int stream_index)
{
stream_indices.push_back(stream_index);
- servers->set_header(stream_index, http_header, "");
+ servers->set_header(stream_index, http_header, stream_header);
}
InputStats HTTPInput::get_stats() const