#include <sys/time.h>
#include <time.h>
#include <unistd.h>
+#include <math.h>
#include <map>
#include <string>
#include <utility>
stats.url = url;
stats.bytes_received = 0;
stats.data_bytes_received = 0;
+ stats.metadata_bytes_received = 0;
stats.connect_time = -1;
+ stats.latency_sec = HUGE_VAL;
last_verbose_connection.tv_sec = -3600;
last_verbose_connection.tv_nsec = 0;
stats.url = url;
stats.bytes_received = serialized.bytes_received();
stats.data_bytes_received = serialized.data_bytes_received();
+ stats.metadata_bytes_received = serialized.metadata_bytes_received();
if (serialized.has_connect_time()) {
stats.connect_time = serialized.connect_time();
} else {
stats.connect_time = time(NULL);
}
+ if (serialized.has_latency_sec()) {
+ stats.latency_sec = serialized.latency_sec();
+ } else {
+ stats.latency_sec = HUGE_VAL;
+ }
last_verbose_connection.tv_sec = -3600;
last_verbose_connection.tv_nsec = 0;
serialized.set_sock(sock);
serialized.set_bytes_received(stats.bytes_received);
serialized.set_data_bytes_received(stats.data_bytes_received);
+ if (isfinite(stats.latency_sec)) {
+ serialized.set_latency_sec(stats.latency_sec);
+ }
serialized.set_connect_time(stats.connect_time);
if (encoding == Input::INPUT_ENCODING_METACUBE) {
serialized.set_is_metacube_encoded(true);
// Remove “Content-encoding: metacube”.
// TODO: Make case-insensitive.
- multimap<string, string>::iterator encoding_it =
- parameters.find("Content-encoding");
+ const auto encoding_it = parameters.find("Content-encoding");
if (encoding_it != parameters.end() && encoding_it->second == "metacube") {
parameters.erase(encoding_it);
}
if (parameters.count("Server") == 0) {
parameters.insert(make_pair("Server", SERVER_IDENTIFICATION));
} else {
- for (multimap<string, string>::iterator it = parameters.begin();
- it != parameters.end();
- ++it) {
- if (it->first != "Server") {
+ for (auto &key_and_value : parameters) {
+ if (key_and_value.first != "Server") {
continue;
}
- it->second = SERVER_IDENTIFICATION " (reflecting: " + it->second + ")";
+ key_and_value.second = SERVER_IDENTIFICATION " (reflecting: " + key_and_value.second + ")";
}
}
// Construct the new HTTP header.
http_header = "HTTP/1.0 200 OK\r\n";
- for (multimap<string, string>::iterator it = parameters.begin();
- it != parameters.end();
- ++it) {
- http_header.append(it->first + ": " + it->second + "\r\n");
+ for (const auto &key_and_value : parameters) {
+ http_header.append(key_and_value.first + ": " + key_and_value.second + "\r\n");
}
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, stream_header);
+ for (int stream_index : stream_indices) {
+ servers->set_header(stream_index, http_header, stream_header);
}
return true;
response.clear();
pending_data.clear();
has_metacube_header = false;
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], "", "");
+ for (int stream_index : stream_indices) {
+ servers->set_header(stream_index, "", "");
}
{
}
if (encoding == Input::INPUT_ENCODING_RAW) {
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], ptr, bytes, /*metacube_flags=*/0);
+ for (int stream_index : stream_indices) {
+ servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0);
}
return;
}
return;
}
- // Send this block on to the servers.
- {
- MutexLock lock(&stats_mutex);
- stats.data_bytes_received += size;
- }
- char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
- if (flags & METACUBE_FLAGS_HEADER) {
- stream_header = string(inner_data, inner_data + size);
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, stream_header);
+ // See if this is a metadata block. If so, we don't want to send it on,
+ // but rather process it ourselves.
+ // TODO: Keep metadata when sending on to other Metacube users.
+ if (flags & METACUBE_FLAGS_METADATA) {
+ {
+ MutexLock lock(&stats_mutex);
+ stats.metadata_bytes_received += size;
+ }
+ process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size);
+ } else {
+ // Send this block on to the servers.
+ {
+ MutexLock lock(&stats_mutex);
+ stats.data_bytes_received += size;
+ }
+ char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
+ if (flags & METACUBE_FLAGS_HEADER) {
+ stream_header = string(inner_data, inner_data + size);
+ for (int stream_index : stream_indices) {
+ servers->set_header(stream_index, http_header, stream_header);
+ }
+ }
+ for (int stream_index : stream_indices) {
+ servers->add_data(stream_index, inner_data, size, flags);
}
- }
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], inner_data, size, flags);
}
// Consume the block. This isn't the most efficient way of dealing with things
MutexLock lock(&stats_mutex);
return stats;
}
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+ if (payload_size < sizeof(uint64_t)) {
+ log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ uint64_t type = be64toh(*(const uint64_t *)payload);
+ if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+ // Unknown metadata block, ignore.
+ return;
+ }
+
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+ 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+ {
+ MutexLock lock(&stats_mutex);
+ stats.latency_sec = elapsed;
+ }
+}