+void HTTPInput::add_destination(int stream_index)
+{
+ stream_indices.push_back(stream_index);
+ servers->set_header(stream_index, http_header, stream_header);
+}
+
+InputStats HTTPInput::get_stats() const
+{
+ MutexLock lock(&stats_mutex);
+ return stats;
+}
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+ if (payload_size < sizeof(uint64_t)) {
+ log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ uint64_t type = be64toh(*(const uint64_t *)payload);
+ if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+ // Unknown metadata block, ignore.
+ return;
+ }
+
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+ 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+ {
+ MutexLock lock(&stats_mutex);
+ stats.latency_sec = elapsed;
+ }
+}