]> git.sesse.net Git - nageru/commitdiff
Drop buffered SRT data if the connection is down for too long.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 4 Oct 2023 22:18:16 +0000 (00:18 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 4 Oct 2023 22:19:58 +0000 (00:19 +0200)
This stops huge backlogs from building up, like we can get with HTTP.

nageru/video_encoder.cpp
shared/mux.cpp
shared/mux.h

index 0257d38bbfae3f271a107a936a5f66150ed620bf..3b7028d5b9346a3a9a4e45145b5110e29c600d1d 100644 (file)
@@ -9,6 +9,7 @@
 #include <netdb.h>
 #include <string>
 #include <thread>
+#include <chrono>
 
 extern "C" {
 #include <libavutil/mem.h>
@@ -30,6 +31,7 @@ extern "C" {
 class RefCountedFrame;
 
 using namespace std;
+using namespace std::chrono;
 using namespace movit;
 
 namespace {
@@ -463,21 +465,52 @@ int VideoEncoder::write_srt_packet(uint8_t *buf, int buf_size)
        if (want_srt_metric_update.exchange(false) && srt_sock != -1) {
                srt_metrics.update_srt_stats(srt_sock);
        }
+
+       bool has_drained = false;
+       bool trying_reconnect = false;
+       steady_clock::time_point first_connect_start;
+
        while (buf_size > 0 && !should_quit.load()) {
                if (srt_sock == -1) {
+                       if (!trying_reconnect) {
+                               first_connect_start = steady_clock::now();
+                               trying_reconnect = true;
+                       }
                        srt_sock = connect_to_srt();
                        if (srt_sock == -1) {
                                usleep(100000);
+                               if (!has_drained && duration<double>(steady_clock::now() - first_connect_start).count() >= global_flags.srt_output_latency_ms * 1e-3) {
+                                       // The entire concept for SRT is to have fixed, low latency.
+                                       // If we've been out for more than a latency period, we shouldn't
+                                       // try to send the entire backlog. (But we should be tolerant
+                                       // of a quick disconnect and reconnect.) Maybe it would be better
+                                       // to have a sliding window of how much we remove, but it quickly
+                                       // starts getting esoteric, so juts drop it all.
+                                       fprintf(stderr, "WARNING: No SRT connection for more than %d ms, dropping data.\n",
+                                               global_flags.srt_output_latency_ms);
+                                       srt_mux->drain();
+                                       has_drained = true;
+                               }
                                continue;
                        }
                        srt_metrics.update_srt_stats(srt_sock);
                }
+               if (has_drained) {
+                       // Now that we're reconnected, we can start accepting data again,
+                       // but discard the rest of this write (it is very old by now).
+                       srt_mux->undrain();
+                       break;
+               }
                int to_send = min(buf_size, SRT_LIVE_DEF_PLSIZE);
                int ret = srt_send(srt_sock, (char *)buf, to_send);
                if (ret < 0)  {
                        fprintf(stderr, "srt_send(): %s\n", srt_getlasterror_str());
                        srt_close(srt_sock);
                        srt_metrics.metric_srt_uptime_seconds = 0.0 / 0.0;
+                       if (!trying_reconnect) {
+                               first_connect_start = steady_clock::now();
+                               trying_reconnect = true;
+                       }
                        srt_sock = connect_to_srt();
                        continue;
                }
index b8feacd6907460cedb017a07626c64dca5ed10a7..a52f21fbdab35b18bae0161cab70b9bd5d6013dd 100644 (file)
@@ -172,7 +172,9 @@ void Mux::add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational t
 
        {
                lock_guard<mutex> lock(mu);
-               if (write_strategy == WriteStrategy::WRITE_BACKGROUND) {
+               if (drained) {
+                       // Just drop the packet on the floor.
+               } else if (write_strategy == WriteStrategy::WRITE_BACKGROUND) {
                        packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
                        if (plug_count == 0)
                                packet_queue_ready.notify_all();
@@ -218,6 +220,7 @@ void Mux::write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts)
 void Mux::plug()
 {
        lock_guard<mutex> lock(mu);
+       assert(!drained);
        ++plug_count;
 }
 
@@ -242,6 +245,25 @@ void Mux::unplug()
        }
 }
 
+void Mux::drain()
+{
+       lock_guard<mutex> lock(mu);
+       assert(!drained);
+       assert(plug_count == 0);
+       for (QueuedPacket &qp : packet_queue) {
+               av_packet_free(&qp.pkt);
+       }
+       packet_queue.clear();
+       drained = true;
+}
+
+void Mux::undrain()
+{
+       lock_guard<mutex> lock(mu);
+       assert(drained);
+       drained = false;
+}
+
 void Mux::thread_func()
 {
        pthread_setname_np(pthread_self(), "Mux");
index 0fb913a37636fc746771896c763e2ea238ba4eb5..5020f37d943c64e98ee73c1d8e7771a2da88659d 100644 (file)
@@ -92,6 +92,12 @@ public:
        void plug();
        void unplug();
 
+       // Temporary stop the mux; any packets coming in are discarded, and any existing ones
+       // in the queue will be dropped. Any writes in progress will finish as usual.
+       // Incompatible with plug().
+       void drain();
+       void undrain();
+
 private:
        // If write_strategy == WRITE_FOREGORUND, Must be called with <mu> held.
        void write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts);
@@ -108,6 +114,7 @@ private:
 
        AVFormatContext *avctx;  // Protected by <mu>, iff write_strategy == WRITE_BACKGROUND.
        int plug_count = 0;  // Protected by <mu>.
+       bool drained = false;  // Protected by <mu>.
 
        // Protected by <mu>. If write_strategy == WRITE_FOREGROUND,
        // this is only in use when plugging.