]> git.sesse.net Git - nageru/blobdiff - nageru/video_encoder.cpp
IWYU-fix nageru/*.cpp.
[nageru] / nageru / video_encoder.cpp
index c1938d505d5e666662e027cd62608b1bc628f376..b68086c946c58723084250f262d06440eb3d107e 100644 (file)
@@ -1,17 +1,27 @@
-#include "video_encoder.h"
-
+#include <algorithm>
 #include <assert.h>
-#include <stdio.h>
-#include <time.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
+#include <chrono>
+#include <epoxy/gl.h>
+#include <memory>
+#include <movit/image_format.h>
+#include <mutex>
 #include <netdb.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <string.h>
 #include <string>
+#include <sys/socket.h>
 #include <thread>
+#include <time.h>
+#include <unistd.h>
+#include <vector>
 
 extern "C" {
 #include <libavutil/mem.h>
+#include <libavformat/avformat.h>
+#include <libavformat/avio.h>
+#include <libavutil/avutil.h>
+#include <libavutil/rational.h>
 }
 
 #include "audio_encoder.h"
@@ -26,10 +36,15 @@ extern "C" {
 #include "quicksync_encoder.h"
 #include "shared/timebase.h"
 #include "x264_encoder.h"
+#include "video_encoder.h"
+#include "shared/metrics.h"
+#include "shared/ref_counted_gl_sync.h"
+#include "shared/shared_defs.h"
 
 class RefCountedFrame;
 
 using namespace std;
+using namespace std::chrono;
 using namespace movit;
 
 namespace {
@@ -335,7 +350,7 @@ int VideoEncoder::open_srt_socket()
                return -1;
        }
 
-       if (srt_setsockopt(sock, 0, SRTO_LATENCY, &global_flags.srt_output_latency, sizeof(global_flags.srt_output_latency)) < 0) {
+       if (srt_setsockopt(sock, 0, SRTO_LATENCY, &global_flags.srt_output_latency_ms, sizeof(global_flags.srt_output_latency_ms)) < 0) {
                fprintf(stderr, "srt_setsockopt(SRTO_LATENCY): %s\n", srt_getlasterror_str());
                srt_close(sock);
                return -1;
@@ -388,13 +403,70 @@ int VideoEncoder::connect_to_srt()
                        return sock;
                }
                ++metric_srt_num_connection_attempts;
+
+               // We do a non-blocking connect, so that we can check should_quit
+               // every now and then.
+               int blocking = 0;
+               if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &blocking, sizeof(blocking)) < 0) {
+                       fprintf(stderr, "srt_setsockopt(SRTO_SNDSYN=0): %s\n", srt_getlasterror_str());
+                       srt_close(sock);
+                       continue;
+               }
                if (srt_connect(sock, cur->ai_addr, cur->ai_addrlen) < 0) {
                        fprintf(stderr, "srt_connect(%s): %s\n", print_addrinfo(cur).c_str(), srt_getlasterror_str());
                        srt_close(sock);
                        continue;
                }
-               fprintf(stderr, "Connected to destination SRT endpoint at %s.\n", print_addrinfo(cur).c_str());
-               return sock;
+               int eid = srt_epoll_create();
+               if (eid < 0) {
+                       fprintf(stderr, "srt_epoll_create(): %s\n", srt_getlasterror_str());
+                       srt_close(sock);
+                       continue;
+               }
+               int modes = SRT_EPOLL_ERR | SRT_EPOLL_OUT;
+               if (srt_epoll_add_usock(eid, sock, &modes) < 0) {
+                       fprintf(stderr, "srt_epoll_usock(): %s\n", srt_getlasterror_str());
+                       srt_close(sock);
+                       srt_epoll_release(eid);
+                       continue;
+               }
+               bool ok;
+               while (!should_quit.load()) {
+                       SRTSOCKET errfds[1], writefds[1];
+                       int num_errfds = 1, num_writefds = 1;
+                       int poll_time_ms = 100;
+                       int ret = srt_epoll_wait(eid, errfds, &num_errfds, writefds, &num_writefds, poll_time_ms, 0, 0, 0, 0);
+                       if (ret < 0) {
+                               if (srt_getlasterror(nullptr) == SRT_ETIMEOUT) {
+                                       continue;
+                               } else {
+                                       fprintf(stderr, "srt_epoll_wait(): %s\n", srt_getlasterror_str());
+                                       srt_close(sock);
+                                       srt_epoll_release(eid);
+                                       return -1;
+                               }
+                       } else if (ret > 0) {
+                               // The SRT epoll framework is pretty odd, but seemingly,
+                               // this is the way. Getting the same error code as srt_connect()
+                               // would normally return seems to be impossible, though.
+                               ok = (num_errfds == 0);
+                               break;
+                               fprintf(stderr, "num_errfds=%d num_writefds=%d last_err=%s\n", num_errfds, num_writefds, srt_getlasterror_str());
+                               break;
+                       }
+               }
+               srt_epoll_release(eid);
+               if (should_quit.load()) {
+                       srt_close(sock);
+                       return -1;
+               }
+               if (ok) {
+                       fprintf(stderr, "Connected to destination SRT endpoint at %s.\n", print_addrinfo(cur).c_str());
+                       return sock;
+               } else {
+                       fprintf(stderr, "srt_connect(%s): %s\n", print_addrinfo(cur).c_str(), srt_getlasterror_str());
+                       srt_close(sock);
+               }
        }
 
        // Out of candidates, so give up.
@@ -406,21 +478,52 @@ int VideoEncoder::write_srt_packet(uint8_t *buf, int buf_size)
        if (want_srt_metric_update.exchange(false) && srt_sock != -1) {
                srt_metrics.update_srt_stats(srt_sock);
        }
+
+       bool has_drained = false;
+       bool trying_reconnect = false;
+       steady_clock::time_point first_connect_start;
+
        while (buf_size > 0 && !should_quit.load()) {
                if (srt_sock == -1) {
+                       if (!trying_reconnect) {
+                               first_connect_start = steady_clock::now();
+                               trying_reconnect = true;
+                       }
                        srt_sock = connect_to_srt();
                        if (srt_sock == -1) {
                                usleep(100000);
+                               if (!has_drained && duration<double>(steady_clock::now() - first_connect_start).count() >= global_flags.srt_output_latency_ms * 1e-3) {
+                                       // The entire concept for SRT is to have fixed, low latency.
+                                       // If we've been out for more than a latency period, we shouldn't
+                                       // try to send the entire backlog. (But we should be tolerant
+                                       // of a quick disconnect and reconnect.) Maybe it would be better
+                                       // to have a sliding window of how much we remove, but it quickly
+                                       // starts getting esoteric, so juts drop it all.
+                                       fprintf(stderr, "WARNING: No SRT connection for more than %d ms, dropping data.\n",
+                                               global_flags.srt_output_latency_ms);
+                                       srt_mux->drain();
+                                       has_drained = true;
+                               }
                                continue;
                        }
                        srt_metrics.update_srt_stats(srt_sock);
                }
+               if (has_drained) {
+                       // Now that we're reconnected, we can start accepting data again,
+                       // but discard the rest of this write (it is very old by now).
+                       srt_mux->undrain();
+                       break;
+               }
                int to_send = min(buf_size, SRT_LIVE_DEF_PLSIZE);
                int ret = srt_send(srt_sock, (char *)buf, to_send);
                if (ret < 0)  {
                        fprintf(stderr, "srt_send(): %s\n", srt_getlasterror_str());
                        srt_close(srt_sock);
                        srt_metrics.metric_srt_uptime_seconds = 0.0 / 0.0;
+                       if (!trying_reconnect) {
+                               first_connect_start = steady_clock::now();
+                               trying_reconnect = true;
+                       }
                        srt_sock = connect_to_srt();
                        continue;
                }