Support Metacube metadata blocks, specifically timestamps.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 16 Jul 2016 17:43:29 +0000 (19:43 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 22 Jul 2016 22:45:08 +0000 (00:45 +0200)
Allows you to measure latency from encoder to reflector; specifically,
this is useful to figure out if you have a HTTP queue that keeps on
growing indefinitely.

httpinput.cpp
httpinput.h
input.h
input_stats.cpp
metacube2.cpp
metacube2.h
state.proto
udpinput.cpp

index a45a4ba..2ad9746 100644 (file)
@@ -11,6 +11,7 @@
 #include <sys/time.h>
 #include <time.h>
 #include <unistd.h>
+#include <math.h>
 #include <map>
 #include <string>
 #include <utility>
@@ -58,7 +59,9 @@ HTTPInput::HTTPInput(const string &url, Input::Encoding encoding)
        stats.url = url;
        stats.bytes_received = 0;
        stats.data_bytes_received = 0;
+       stats.metadata_bytes_received = 0;
        stats.connect_time = -1;
+       stats.latency_sec = HUGE_VAL;
 
        last_verbose_connection.tv_sec = -3600;
        last_verbose_connection.tv_nsec = 0;
@@ -90,11 +93,17 @@ HTTPInput::HTTPInput(const InputProto &serialized)
        stats.url = url;
        stats.bytes_received = serialized.bytes_received();
        stats.data_bytes_received = serialized.data_bytes_received();
+       stats.metadata_bytes_received = serialized.metadata_bytes_received();
        if (serialized.has_connect_time()) {
                stats.connect_time = serialized.connect_time();
        } else {
                stats.connect_time = time(NULL);
        }
+       if (serialized.has_latency_sec()) {
+               stats.latency_sec = serialized.latency_sec();
+       } else {
+               stats.latency_sec = HUGE_VAL;
+       }
 
        last_verbose_connection.tv_sec = -3600;
        last_verbose_connection.tv_nsec = 0;
@@ -126,6 +135,9 @@ InputProto HTTPInput::serialize() const
        serialized.set_sock(sock);
        serialized.set_bytes_received(stats.bytes_received);
        serialized.set_data_bytes_received(stats.data_bytes_received);
+       if (isfinite(stats.latency_sec)) {
+               serialized.set_latency_sec(stats.latency_sec);
+       }
        serialized.set_connect_time(stats.connect_time);
        if (encoding == Input::INPUT_ENCODING_METACUBE) {
                serialized.set_is_metacube_encoded(true);
@@ -642,21 +654,32 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                        return;
                }
 
-               // Send this block on to the servers.
-               {
-                       MutexLock lock(&stats_mutex);
-                       stats.data_bytes_received += size;
-               }
-               char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
-               if (flags & METACUBE_FLAGS_HEADER) {
-                       stream_header = string(inner_data, inner_data + size);
+               // See if this is a metadata block. If so, we don't want to send it on,
+               // but rather process it ourselves.
+               // TODO: Keep metadata when sending on to other Metacube users.
+               if (flags & METACUBE_FLAGS_METADATA) {
+                       {
+                               MutexLock lock(&stats_mutex);
+                               stats.metadata_bytes_received += size;
+                       }
+                       process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size);
+               } else {
+                       // Send this block on to the servers.
+                       {
+                               MutexLock lock(&stats_mutex);
+                               stats.data_bytes_received += size;
+                       }
+                       char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
+                       if (flags & METACUBE_FLAGS_HEADER) {
+                               stream_header = string(inner_data, inner_data + size);
+                               for (size_t i = 0; i < stream_indices.size(); ++i) {
+                                       servers->set_header(stream_indices[i], http_header, stream_header);
+                               }
+                       }
                        for (size_t i = 0; i < stream_indices.size(); ++i) {
-                               servers->set_header(stream_indices[i], http_header, stream_header);
+                               servers->add_data(stream_indices[i], inner_data, size, flags);
                        }
                }
-               for (size_t i = 0; i < stream_indices.size(); ++i) {
-                       servers->add_data(stream_indices[i], inner_data, size, flags);
-               }
 
                // Consume the block. This isn't the most efficient way of dealing with things
                // should we have many blocks, but these routines don't need to be too efficient
@@ -688,3 +711,35 @@ InputStats HTTPInput::get_stats() const
        MutexLock lock(&stats_mutex);
        return stats;
 }
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+       if (payload_size < sizeof(uint64_t)) {
+               log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+                       url.c_str(), payload_size);
+               return;
+       }
+
+       uint64_t type = be64toh(*(const uint64_t *)payload);
+       if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+               // Unknown metadata block, ignore.
+               return;
+       }
+
+       timespec now;
+       clock_gettime(CLOCK_REALTIME, &now);
+
+       const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+       if (payload_size != sizeof(*pkt)) {
+               log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+                       url.c_str(), payload_size);
+               return;
+       }
+
+       double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+               1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+       {
+               MutexLock lock(&stats_mutex);
+               stats.latency_sec = elapsed;
+       }
+}
index e7c447a..0e29453 100644 (file)
@@ -7,6 +7,7 @@
 #include <vector>
 
 #include "input.h"
+#include "metacube2.h"
 
 class InputProto;
 
@@ -44,6 +45,8 @@ private:
        // and outputs a warning.
        void drop_pending_data(size_t num_bytes);
 
+       void process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size);
+
        enum State {
                NOT_CONNECTED,
                SENDING_REQUEST,
diff --git a/input.h b/input.h
index 1b94e59..d609fdf 100644 (file)
--- a/input.h
+++ b/input.h
@@ -21,14 +21,20 @@ struct InputStats {
 
        // The number of data bytes we have received so far (or more precisely,
        // number of data bytes we have sent on to the stream). This excludes Metacube
-       // headers and corrupted data we've skipped.
+       // headers, metadata and corrupted data we've skipped.
        //
        // Not reset across connections.
        size_t data_bytes_received;
 
+       // Same, except counts only Metacube metadata.
+       size_t metadata_bytes_received;
+
        // When the current connection was initiated. -1 if we are not currently connected.
        time_t connect_time;
 
+       // Last latency measurement, HUGE_VAL if no measurement yet.
+       double latency_sec;
+
        // TODO: Number of loss events might both be useful,
        // similar to for clients. Also, per-connection byte counters.
 };
index 92a6718..6bd1529 100644 (file)
@@ -5,6 +5,7 @@
 #include <string.h>
 #include <time.h>
 #include <unistd.h>
+#include <math.h>
 #include <vector>
 
 #include "input.h"
@@ -51,16 +52,21 @@ void InputStatsThread::do_work()
                now = time(NULL);
                for (size_t i = 0; i < inputs.size(); ++i) {
                        InputStats stats = inputs[i]->get_stats();
+                       fprintf(fp, "%s %llu %llu", stats.url.c_str(),
+                               (long long unsigned)(stats.bytes_received),
+                               (long long unsigned)(stats.data_bytes_received));
                        if (stats.connect_time == -1) {
-                               fprintf(fp, "%s %llu %llu -\n", stats.url.c_str(),
-                                       (long long unsigned)(stats.bytes_received),
-                                       (long long unsigned)(stats.data_bytes_received));
+                               fprintf(fp, " -");
                        } else {
-                               fprintf(fp, "%s %llu %llu %d\n", stats.url.c_str(),
-                                       (long long unsigned)(stats.bytes_received),
-                                       (long long unsigned)(stats.data_bytes_received),
-                                       int(now - stats.connect_time));
+                               fprintf(fp, " %d", int(now - stats.connect_time));
                        }
+                       fprintf(fp, " %llu", (long long unsigned)(stats.metadata_bytes_received));
+                       if (!isfinite(stats.latency_sec)) {
+                               fprintf(fp, " -");
+                       } else {
+                               fprintf(fp, " %.6f", stats.latency_sec);
+                       }
+                       fprintf(fp, "\n");
                }
                if (fclose(fp) == EOF) {
                        log_perror("fclose");
index 9473dce..666355d 100644 (file)
@@ -7,6 +7,8 @@
 
 #include "metacube2.h"
 
+#include <arpa/inet.h>
+
 /*
  * https://www.ece.cmu.edu/~koopman/pubs/KoopmanCRCWebinar9May2012.pdf
  * recommends this for messages as short as ours (see table at page 34).
@@ -24,7 +26,7 @@ uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr)
        uint16_t crc = METACUBE2_CRC_START;
        int i, j;
 
-       for (i = 0; i < data_len; ++i) {        
+       for (i = 0; i < data_len; ++i) {
                uint8_t c = data[i];
                for (j = 0; j < 8; j++) {
                        int bit = crc & 0x8000;
@@ -44,5 +46,14 @@ uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr)
                }
        }
 
+       /*
+        * Invert the checksum for metadata packets, so that clients that
+        * don't understand metadata will ignore it as broken. There will
+        * probably be logging, but apart from that, it's harmless.
+        */
+       if (ntohs(hdr->flags) & METACUBE_FLAGS_METADATA) {
+               crc ^= 0xffff;
+       }
+
        return crc;
 }
index 43de8b7..5b6077e 100644 (file)
 #define METACUBE_FLAGS_HEADER 0x1
 #define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2
 
+/*
+ * Metadata packets; should not be counted as data, but rather
+ * parsed (or ignored if you don't understand them).
+ *
+ * Metadata packets start with a uint64_t (network byte order)
+ * that describe the type; the rest is defined by the type.
+ */
+#define METACUBE_FLAGS_METADATA 0x4
+
 struct metacube2_block_header {
        char sync[8];    /* METACUBE2_SYNC */
        uint32_t size;   /* Network byte order. Does not include header. */
        uint16_t flags;  /* Network byte order. METACUBE_FLAGS_*. */
-       uint16_t csum;   /* Network byte order. CRC16 of size and flags. */
+       uint16_t csum;   /* Network byte order. CRC16 of size and flags.
+                            If METACUBE_FLAGS_METADATA is set, inverted
+                            so that older clients will ignore it as broken. */
 };
 
 uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr);
 
+/*
+ * The only currently defined metadata type. Set by the encoder,
+ * and can be measured for latency purposes (e.g., if the network
+ * can't keep up, the latency will tend to increase.
+ */
+#define METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP 0x1
+
+struct metacube2_timestamp_packet {
+       uint64_t type;  /* METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP, in network byte order. */
+
+       /*
+        * Time since the UTC epoch. Basically a struct timespec.
+        * Both are in network byte order.
+        */
+       uint64_t tv_sec;
+       uint64_t tv_nsec;
+};
+
 #endif  /* !defined(_METACUBE_H) */
index 2a788eb..2eb65e0 100644 (file)
@@ -46,6 +46,8 @@ message InputProto {
        optional int32 sock = 9;
        optional int64 bytes_received = 11;
        optional int64 data_bytes_received = 12;
+       optional int64 metadata_bytes_received = 16;
+       optional double latency_sec = 17;
        optional int64 connect_time = 13;
        optional bool is_metacube_encoded = 15 [default=true];
 };
index 71364f1..81aef7f 100644 (file)
@@ -6,6 +6,7 @@
 #include <sys/socket.h>
 #include <time.h>
 #include <unistd.h>
+#include <math.h>
 #include <string>
 
 #include "acceptor.h"
@@ -116,7 +117,9 @@ UDPInput::UDPInput(const string &url)
        stats.url = url;
        stats.bytes_received = 0;
        stats.data_bytes_received = 0;
+       stats.metadata_bytes_received = 0;
        stats.connect_time = time(NULL);
+       stats.latency_sec = HUGE_VAL;
 }
 
 UDPInput::UDPInput(const InputProto &serialized)