+void HTTPInput::add_destination(int stream_index)
+{
+ stream_indices.push_back(stream_index);
+ servers->set_header(stream_index, http_header, stream_header);
+}
+
+InputStats HTTPInput::get_stats() const
+{
+ lock_guard<mutex> lock(stats_mutex);
+ return stats;
+}
+
+void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hdr, const char *payload, uint32_t payload_size)
+{
+ if (payload_size < sizeof(uint64_t)) {
+ log(WARNING, "[%s] Undersized Metacube metadata block (%d bytes); corrupted header?",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ uint64_t type = be64toh(*(const uint64_t *)payload);
+ if (type == METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+
+ double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+ 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+ {
+ lock_guard<mutex> lock(stats_mutex);
+ stats.latency_sec = elapsed;
+ }
+ } else if (type == METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS) {
+ const metacube2_pts_packet *pkt = (const metacube2_pts_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube pts block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+ next_block_pts.pts = be64toh(pkt->pts);
+ next_block_pts.timebase_num = be64toh(pkt->timebase_num);
+ next_block_pts.timebase_den = be64toh(pkt->timebase_den);
+ } else {
+ // Unknown metadata block, ignore
+ log(INFO, "[%s] Metadata block %llu\n", url.c_str(), type);
+ return;
+ }
+}