#include <sys/time.h>
#include <time.h>
#include <unistd.h>
+#include <math.h>
#include <map>
#include <string>
#include <utility>
stats.url = url;
stats.bytes_received = 0;
stats.data_bytes_received = 0;
+ stats.metadata_bytes_received = 0;
stats.connect_time = -1;
+ stats.latency_sec = HUGE_VAL;
last_verbose_connection.tv_sec = -3600;
last_verbose_connection.tv_nsec = 0;
http_header(serialized.http_header()),
stream_header(serialized.stream_header()),
has_metacube_header(serialized.has_metacube_header()),
- sock(serialized.sock())
+ sock(serialized.sock()),
+ num_connection_attempts(0),
+ suppress_logging(false)
{
pending_data.resize(serialized.pending_data().size());
memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size());
stats.url = url;
stats.bytes_received = serialized.bytes_received();
stats.data_bytes_received = serialized.data_bytes_received();
+ stats.metadata_bytes_received = serialized.metadata_bytes_received();
if (serialized.has_connect_time()) {
stats.connect_time = serialized.connect_time();
} else {
stats.connect_time = time(NULL);
}
+ if (serialized.has_latency_sec()) {
+ stats.latency_sec = serialized.latency_sec();
+ } else {
+ stats.latency_sec = HUGE_VAL;
+ }
+
+ last_verbose_connection.tv_sec = -3600;
+ last_verbose_connection.tv_nsec = 0;
}
void HTTPInput::close_socket()
serialized.set_sock(sock);
serialized.set_bytes_received(stats.bytes_received);
serialized.set_data_bytes_received(stats.data_bytes_received);
+ if (isfinite(stats.latency_sec)) {
+ serialized.set_latency_sec(stats.latency_sec);
+ }
serialized.set_connect_time(stats.connect_time);
if (encoding == Input::INPUT_ENCODING_METACUBE) {
serialized.set_is_metacube_encoded(true);
addrinfo *ai;
int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
if (err != 0) {
- log(WARNING, "[%s] Lookup of '%s' failed (%s).",
- url.c_str(), host.c_str(), gai_strerror(err));
+ if (!suppress_logging) {
+ log(WARNING, "[%s] Lookup of '%s' failed (%s).",
+ url.c_str(), host.c_str(), gai_strerror(err));
+ }
return -1;
}
}
// Give the last one as error.
- log(WARNING, "[%s] Connect to '%s' failed (%s)",
- url.c_str(), host.c_str(), strerror(errno));
+ if (!suppress_logging) {
+ log(WARNING, "[%s] Connect to '%s' failed (%s)",
+ url.c_str(), host.c_str(), strerror(errno));
+ }
freeaddrinfo(base_ai);
return -1;
}
return;
}
- // Send this block on to the servers.
- {
- MutexLock lock(&stats_mutex);
- stats.data_bytes_received += size;
- }
- char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
- if (flags & METACUBE_FLAGS_HEADER) {
- stream_header = string(inner_data, inner_data + size);
+ // See if this is a metadata block. If so, we don't want to send it on,
+ // but rather process it ourselves.
+ // TODO: Keep metadata when sending on to other Metacube users.
+ if (flags & METACUBE_FLAGS_METADATA) {
+ {
+ MutexLock lock(&stats_mutex);
+ stats.metadata_bytes_received += size;
+ }
+ process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size);
+ } else {
+ // Send this block on to the servers.
+ {
+ MutexLock lock(&stats_mutex);
+ stats.data_bytes_received += size;
+ }
+ char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
+ if (flags & METACUBE_FLAGS_HEADER) {
+ stream_header = string(inner_data, inner_data + size);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, stream_header);
+ }
+ }
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, stream_header);
+ servers->add_data(stream_indices[i], inner_data, size, flags);
}
}
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], inner_data, size, flags);
- }
// Consume the block. This isn't the most efficient way of dealing with things
// should we have many blocks, but these routines don't need to be too efficient
MutexLock lock(&stats_mutex);
return stats;
}
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+ if (payload_size < sizeof(uint64_t)) {
+ log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ uint64_t type = be64toh(*(const uint64_t *)payload);
+ if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+ // Unknown metadata block, ignore.
+ return;
+ }
+
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+ 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+ {
+ MutexLock lock(&stats_mutex);
+ stats.latency_sec = elapsed;
+ }
+}