]> git.sesse.net Git - cubemap/blobdiff - httpinput.cpp
Support Metacube metadata blocks, specifically timestamps.
[cubemap] / httpinput.cpp
index a45a4ba00e2ef99031beddd7c449719174c4dd15..2ad9746e7bf2f7cd4c84f8629c26092f22668561 100644 (file)
@@ -11,6 +11,7 @@
 #include <sys/time.h>
 #include <time.h>
 #include <unistd.h>
+#include <math.h>
 #include <map>
 #include <string>
 #include <utility>
@@ -58,7 +59,9 @@ HTTPInput::HTTPInput(const string &url, Input::Encoding encoding)
        stats.url = url;
        stats.bytes_received = 0;
        stats.data_bytes_received = 0;
+       stats.metadata_bytes_received = 0;
        stats.connect_time = -1;
+       stats.latency_sec = HUGE_VAL;
 
        last_verbose_connection.tv_sec = -3600;
        last_verbose_connection.tv_nsec = 0;
@@ -90,11 +93,17 @@ HTTPInput::HTTPInput(const InputProto &serialized)
        stats.url = url;
        stats.bytes_received = serialized.bytes_received();
        stats.data_bytes_received = serialized.data_bytes_received();
+       stats.metadata_bytes_received = serialized.metadata_bytes_received();
        if (serialized.has_connect_time()) {
                stats.connect_time = serialized.connect_time();
        } else {
                stats.connect_time = time(NULL);
        }
+       if (serialized.has_latency_sec()) {
+               stats.latency_sec = serialized.latency_sec();
+       } else {
+               stats.latency_sec = HUGE_VAL;
+       }
 
        last_verbose_connection.tv_sec = -3600;
        last_verbose_connection.tv_nsec = 0;
@@ -126,6 +135,9 @@ InputProto HTTPInput::serialize() const
        serialized.set_sock(sock);
        serialized.set_bytes_received(stats.bytes_received);
        serialized.set_data_bytes_received(stats.data_bytes_received);
+       if (isfinite(stats.latency_sec)) {
+               serialized.set_latency_sec(stats.latency_sec);
+       }
        serialized.set_connect_time(stats.connect_time);
        if (encoding == Input::INPUT_ENCODING_METACUBE) {
                serialized.set_is_metacube_encoded(true);
@@ -642,21 +654,32 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                        return;
                }
 
-               // Send this block on to the servers.
-               {
-                       MutexLock lock(&stats_mutex);
-                       stats.data_bytes_received += size;
-               }
-               char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
-               if (flags & METACUBE_FLAGS_HEADER) {
-                       stream_header = string(inner_data, inner_data + size);
+               // See if this is a metadata block. If so, we don't want to send it on,
+               // but rather process it ourselves.
+               // TODO: Keep metadata when sending on to other Metacube users.
+               if (flags & METACUBE_FLAGS_METADATA) {
+                       {
+                               MutexLock lock(&stats_mutex);
+                               stats.metadata_bytes_received += size;
+                       }
+                       process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size);
+               } else {
+                       // Send this block on to the servers.
+                       {
+                               MutexLock lock(&stats_mutex);
+                               stats.data_bytes_received += size;
+                       }
+                       char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
+                       if (flags & METACUBE_FLAGS_HEADER) {
+                               stream_header = string(inner_data, inner_data + size);
+                               for (size_t i = 0; i < stream_indices.size(); ++i) {
+                                       servers->set_header(stream_indices[i], http_header, stream_header);
+                               }
+                       }
                        for (size_t i = 0; i < stream_indices.size(); ++i) {
-                               servers->set_header(stream_indices[i], http_header, stream_header);
+                               servers->add_data(stream_indices[i], inner_data, size, flags);
                        }
                }
-               for (size_t i = 0; i < stream_indices.size(); ++i) {
-                       servers->add_data(stream_indices[i], inner_data, size, flags);
-               }
 
                // Consume the block. This isn't the most efficient way of dealing with things
                // should we have many blocks, but these routines don't need to be too efficient
@@ -688,3 +711,35 @@ InputStats HTTPInput::get_stats() const
        MutexLock lock(&stats_mutex);
        return stats;
 }
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+       if (payload_size < sizeof(uint64_t)) {
+               log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+                       url.c_str(), payload_size);
+               return;
+       }
+
+       uint64_t type = be64toh(*(const uint64_t *)payload);
+       if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+               // Unknown metadata block, ignore.
+               return;
+       }
+
+       timespec now;
+       clock_gettime(CLOCK_REALTIME, &now);
+
+       const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+       if (payload_size != sizeof(*pkt)) {
+               log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+                       url.c_str(), payload_size);
+               return;
+       }
+
+       double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+               1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+       {
+               MutexLock lock(&stats_mutex);
+               stats.latency_sec = elapsed;
+       }
+}