From f9219370ce1f5ba37e716d50d41115bb672c4244 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 5 Oct 2023 00:18:16 +0200 Subject: [PATCH] Drop buffered SRT data if the connection is down for too long. This stops huge backlogs from building up, like we can get with HTTP. --- nageru/video_encoder.cpp | 33 +++++++++++++++++++++++++++++++++ shared/mux.cpp | 24 +++++++++++++++++++++++- shared/mux.h | 7 +++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/nageru/video_encoder.cpp b/nageru/video_encoder.cpp index 0257d38..3b7028d 100644 --- a/nageru/video_encoder.cpp +++ b/nageru/video_encoder.cpp @@ -9,6 +9,7 @@ #include #include #include +#include extern "C" { #include @@ -30,6 +31,7 @@ extern "C" { class RefCountedFrame; using namespace std; +using namespace std::chrono; using namespace movit; namespace { @@ -463,21 +465,52 @@ int VideoEncoder::write_srt_packet(uint8_t *buf, int buf_size) if (want_srt_metric_update.exchange(false) && srt_sock != -1) { srt_metrics.update_srt_stats(srt_sock); } + + bool has_drained = false; + bool trying_reconnect = false; + steady_clock::time_point first_connect_start; + while (buf_size > 0 && !should_quit.load()) { if (srt_sock == -1) { + if (!trying_reconnect) { + first_connect_start = steady_clock::now(); + trying_reconnect = true; + } srt_sock = connect_to_srt(); if (srt_sock == -1) { usleep(100000); + if (!has_drained && duration(steady_clock::now() - first_connect_start).count() >= global_flags.srt_output_latency_ms * 1e-3) { + // The entire concept for SRT is to have fixed, low latency. + // If we've been out for more than a latency period, we shouldn't + // try to send the entire backlog. (But we should be tolerant + // of a quick disconnect and reconnect.) Maybe it would be better + // to have a sliding window of how much we remove, but it quickly + // starts getting esoteric, so juts drop it all. + fprintf(stderr, "WARNING: No SRT connection for more than %d ms, dropping data.\n", + global_flags.srt_output_latency_ms); + srt_mux->drain(); + has_drained = true; + } continue; } srt_metrics.update_srt_stats(srt_sock); } + if (has_drained) { + // Now that we're reconnected, we can start accepting data again, + // but discard the rest of this write (it is very old by now). + srt_mux->undrain(); + break; + } int to_send = min(buf_size, SRT_LIVE_DEF_PLSIZE); int ret = srt_send(srt_sock, (char *)buf, to_send); if (ret < 0) { fprintf(stderr, "srt_send(): %s\n", srt_getlasterror_str()); srt_close(srt_sock); srt_metrics.metric_srt_uptime_seconds = 0.0 / 0.0; + if (!trying_reconnect) { + first_connect_start = steady_clock::now(); + trying_reconnect = true; + } srt_sock = connect_to_srt(); continue; } diff --git a/shared/mux.cpp b/shared/mux.cpp index b8feacd..a52f21f 100644 --- a/shared/mux.cpp +++ b/shared/mux.cpp @@ -172,7 +172,9 @@ void Mux::add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational t { lock_guard lock(mu); - if (write_strategy == WriteStrategy::WRITE_BACKGROUND) { + if (drained) { + // Just drop the packet on the floor. + } else if (write_strategy == WriteStrategy::WRITE_BACKGROUND) { packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts }); if (plug_count == 0) packet_queue_ready.notify_all(); @@ -218,6 +220,7 @@ void Mux::write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts) void Mux::plug() { lock_guard lock(mu); + assert(!drained); ++plug_count; } @@ -242,6 +245,25 @@ void Mux::unplug() } } +void Mux::drain() +{ + lock_guard lock(mu); + assert(!drained); + assert(plug_count == 0); + for (QueuedPacket &qp : packet_queue) { + av_packet_free(&qp.pkt); + } + packet_queue.clear(); + drained = true; +} + +void Mux::undrain() +{ + lock_guard lock(mu); + assert(drained); + drained = false; +} + void Mux::thread_func() { pthread_setname_np(pthread_self(), "Mux"); diff --git a/shared/mux.h b/shared/mux.h index 0fb913a..5020f37 100644 --- a/shared/mux.h +++ b/shared/mux.h @@ -92,6 +92,12 @@ public: void plug(); void unplug(); + // Temporary stop the mux; any packets coming in are discarded, and any existing ones + // in the queue will be dropped. Any writes in progress will finish as usual. + // Incompatible with plug(). + void drain(); + void undrain(); + private: // If write_strategy == WRITE_FOREGORUND, Must be called with held. void write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts); @@ -108,6 +114,7 @@ private: AVFormatContext *avctx; // Protected by , iff write_strategy == WRITE_BACKGROUND. int plug_count = 0; // Protected by . + bool drained = false; // Protected by . // Protected by . If write_strategy == WRITE_FOREGROUND, // this is only in use when plugging. -- 2.39.2