#include <sys/time.h>
#include <time.h>
#include <unistd.h>
+#include <math.h>
#include <map>
#include <string>
#include <utility>
stats.url = url;
stats.bytes_received = 0;
stats.data_bytes_received = 0;
+ stats.metadata_bytes_received = 0;
stats.connect_time = -1;
+ stats.latency_sec = HUGE_VAL;
last_verbose_connection.tv_sec = -3600;
last_verbose_connection.tv_nsec = 0;
stats.url = url;
stats.bytes_received = serialized.bytes_received();
stats.data_bytes_received = serialized.data_bytes_received();
+ stats.metadata_bytes_received = serialized.metadata_bytes_received();
if (serialized.has_connect_time()) {
stats.connect_time = serialized.connect_time();
} else {
stats.connect_time = time(NULL);
}
+ if (serialized.has_latency_sec()) {
+ stats.latency_sec = serialized.latency_sec();
+ } else {
+ stats.latency_sec = HUGE_VAL;
+ }
last_verbose_connection.tv_sec = -3600;
last_verbose_connection.tv_nsec = 0;
serialized.set_sock(sock);
serialized.set_bytes_received(stats.bytes_received);
serialized.set_data_bytes_received(stats.data_bytes_received);
+ if (isfinite(stats.latency_sec)) {
+ serialized.set_latency_sec(stats.latency_sec);
+ }
serialized.set_connect_time(stats.connect_time);
if (encoding == Input::INPUT_ENCODING_METACUBE) {
serialized.set_is_metacube_encoded(true);
return;
}
- // Send this block on to the servers.
- {
- MutexLock lock(&stats_mutex);
- stats.data_bytes_received += size;
- }
- char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
- if (flags & METACUBE_FLAGS_HEADER) {
- stream_header = string(inner_data, inner_data + size);
+ // See if this is a metadata block. If so, we don't want to send it on,
+ // but rather process it ourselves.
+ // TODO: Keep metadata when sending on to other Metacube users.
+ if (flags & METACUBE_FLAGS_METADATA) {
+ {
+ MutexLock lock(&stats_mutex);
+ stats.metadata_bytes_received += size;
+ }
+ process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size);
+ } else {
+ // Send this block on to the servers.
+ {
+ MutexLock lock(&stats_mutex);
+ stats.data_bytes_received += size;
+ }
+ char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
+ if (flags & METACUBE_FLAGS_HEADER) {
+ stream_header = string(inner_data, inner_data + size);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, stream_header);
+ }
+ }
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, stream_header);
+ servers->add_data(stream_indices[i], inner_data, size, flags);
}
}
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], inner_data, size, flags);
- }
// Consume the block. This isn't the most efficient way of dealing with things
// should we have many blocks, but these routines don't need to be too efficient
MutexLock lock(&stats_mutex);
return stats;
}
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+ if (payload_size < sizeof(uint64_t)) {
+ log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ uint64_t type = be64toh(*(const uint64_t *)payload);
+ if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+ // Unknown metadata block, ignore.
+ return;
+ }
+
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+ 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+ {
+ MutexLock lock(&stats_mutex);
+ stats.latency_sec = elapsed;
+ }
+}
#include <vector>
#include "input.h"
+#include "metacube2.h"
class InputProto;
// and outputs a warning.
void drop_pending_data(size_t num_bytes);
+ void process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size);
+
enum State {
NOT_CONNECTED,
SENDING_REQUEST,
// The number of data bytes we have received so far (or more precisely,
// number of data bytes we have sent on to the stream). This excludes Metacube
- // headers and corrupted data we've skipped.
+ // headers, metadata and corrupted data we've skipped.
//
// Not reset across connections.
size_t data_bytes_received;
+ // Same, except counts only Metacube metadata.
+ size_t metadata_bytes_received;
+
// When the current connection was initiated. -1 if we are not currently connected.
time_t connect_time;
+ // Last latency measurement, HUGE_VAL if no measurement yet.
+ double latency_sec;
+
// TODO: Number of loss events might both be useful,
// similar to for clients. Also, per-connection byte counters.
};
#include <string.h>
#include <time.h>
#include <unistd.h>
+#include <math.h>
#include <vector>
#include "input.h"
now = time(NULL);
for (size_t i = 0; i < inputs.size(); ++i) {
InputStats stats = inputs[i]->get_stats();
+ fprintf(fp, "%s %llu %llu", stats.url.c_str(),
+ (long long unsigned)(stats.bytes_received),
+ (long long unsigned)(stats.data_bytes_received));
if (stats.connect_time == -1) {
- fprintf(fp, "%s %llu %llu -\n", stats.url.c_str(),
- (long long unsigned)(stats.bytes_received),
- (long long unsigned)(stats.data_bytes_received));
+ fprintf(fp, " -");
} else {
- fprintf(fp, "%s %llu %llu %d\n", stats.url.c_str(),
- (long long unsigned)(stats.bytes_received),
- (long long unsigned)(stats.data_bytes_received),
- int(now - stats.connect_time));
+ fprintf(fp, " %d", int(now - stats.connect_time));
}
+ fprintf(fp, " %llu", (long long unsigned)(stats.metadata_bytes_received));
+ if (!isfinite(stats.latency_sec)) {
+ fprintf(fp, " -");
+ } else {
+ fprintf(fp, " %.6f", stats.latency_sec);
+ }
+ fprintf(fp, "\n");
}
if (fclose(fp) == EOF) {
log_perror("fclose");
#include "metacube2.h"
+#include <arpa/inet.h>
+
/*
* https://www.ece.cmu.edu/~koopman/pubs/KoopmanCRCWebinar9May2012.pdf
* recommends this for messages as short as ours (see table at page 34).
uint16_t crc = METACUBE2_CRC_START;
int i, j;
- for (i = 0; i < data_len; ++i) {
+ for (i = 0; i < data_len; ++i) {
uint8_t c = data[i];
for (j = 0; j < 8; j++) {
int bit = crc & 0x8000;
}
}
+ /*
+ * Invert the checksum for metadata packets, so that clients that
+ * don't understand metadata will ignore it as broken. There will
+ * probably be logging, but apart from that, it's harmless.
+ */
+ if (ntohs(hdr->flags) & METACUBE_FLAGS_METADATA) {
+ crc ^= 0xffff;
+ }
+
return crc;
}
#define METACUBE_FLAGS_HEADER 0x1
#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2
+/*
+ * Metadata packets; should not be counted as data, but rather
+ * parsed (or ignored if you don't understand them).
+ *
+ * Metadata packets start with a uint64_t (network byte order)
+ * that describe the type; the rest is defined by the type.
+ */
+#define METACUBE_FLAGS_METADATA 0x4
+
struct metacube2_block_header {
char sync[8]; /* METACUBE2_SYNC */
uint32_t size; /* Network byte order. Does not include header. */
uint16_t flags; /* Network byte order. METACUBE_FLAGS_*. */
- uint16_t csum; /* Network byte order. CRC16 of size and flags. */
+ uint16_t csum; /* Network byte order. CRC16 of size and flags.
+ If METACUBE_FLAGS_METADATA is set, inverted
+ so that older clients will ignore it as broken. */
};
uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr);
+/*
+ * The only currently defined metadata type. Set by the encoder,
+ * and can be measured for latency purposes (e.g., if the network
+ * can't keep up, the latency will tend to increase.
+ */
+#define METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP 0x1
+
+struct metacube2_timestamp_packet {
+ uint64_t type; /* METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP, in network byte order. */
+
+ /*
+ * Time since the UTC epoch. Basically a struct timespec.
+ * Both are in network byte order.
+ */
+ uint64_t tv_sec;
+ uint64_t tv_nsec;
+};
+
#endif /* !defined(_METACUBE_H) */
optional int32 sock = 9;
optional int64 bytes_received = 11;
optional int64 data_bytes_received = 12;
+ optional int64 metadata_bytes_received = 16;
+ optional double latency_sec = 17;
optional int64 connect_time = 13;
optional bool is_metacube_encoded = 15 [default=true];
};
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>
+#include <math.h>
#include <string>
#include "acceptor.h"
stats.url = url;
stats.bytes_received = 0;
stats.data_bytes_received = 0;
+ stats.metadata_bytes_received = 0;
stats.connect_time = time(NULL);
+ stats.latency_sec = HUGE_VAL;
}
UDPInput::UDPInput(const InputProto &serialized)