X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=nageru%2Fvideo_encoder.cpp;h=b68086c946c58723084250f262d06440eb3d107e;hb=HEAD;hp=29f31050e084a4e9527407cbdfc6823c0a7330d4;hpb=e2b654d6a8cc8c64142a9a8ef8bcd82e9d9a9289;p=nageru diff --git a/nageru/video_encoder.cpp b/nageru/video_encoder.cpp index 29f3105..b68086c 100644 --- a/nageru/video_encoder.cpp +++ b/nageru/video_encoder.cpp @@ -1,17 +1,27 @@ -#include "video_encoder.h" - +#include #include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include #include +#include +#include +#include #include +#include #include +#include +#include +#include extern "C" { #include +#include +#include +#include +#include } #include "audio_encoder.h" @@ -26,10 +36,15 @@ extern "C" { #include "quicksync_encoder.h" #include "shared/timebase.h" #include "x264_encoder.h" +#include "video_encoder.h" +#include "shared/metrics.h" +#include "shared/ref_counted_gl_sync.h" +#include "shared/shared_defs.h" class RefCountedFrame; using namespace std; +using namespace std::chrono; using namespace movit; namespace { @@ -89,23 +104,32 @@ VideoEncoder::VideoEncoder(ResourcePool *resource_pool, QSurface *surface, const open_output_streams(); stream_audio_encoder->add_mux(http_mux.get()); - stream_audio_encoder->add_mux(srt_mux.get()); + if (srt_mux != nullptr) { + stream_audio_encoder->add_mux(srt_mux.get()); + } quicksync_encoder->set_http_mux(http_mux.get()); - quicksync_encoder->set_srt_mux(srt_mux.get()); + if (srt_mux != nullptr) { + quicksync_encoder->set_srt_mux(srt_mux.get()); + } if (global_flags.x264_video_to_http) { x264_encoder->add_mux(http_mux.get()); - x264_encoder->add_mux(srt_mux.get()); + if (srt_mux != nullptr) { + x264_encoder->add_mux(srt_mux.get()); + } } #ifdef HAVE_AV1 if (global_flags.av1_video_to_http) { av1_encoder->add_mux(http_mux.get()); - av1_encoder->add_mux(srt_mux.get()); + if (srt_mux != nullptr) { + av1_encoder->add_mux(srt_mux.get()); + } } #endif } VideoEncoder::~VideoEncoder() { + should_quit = true; quicksync_encoder->shutdown(); x264_encoder.reset(nullptr); x264_disk_encoder.reset(nullptr); @@ -209,6 +233,7 @@ bool VideoEncoder::begin_frame(int64_t pts, int64_t duration, movit::YCbCrLumaCo RefCountedGLsync VideoEncoder::end_frame() { + want_srt_metric_update = true; lock_guard lock(qs_mu); return quicksync_encoder->end_frame(); } @@ -256,6 +281,8 @@ void VideoEncoder::open_output_streams() if (is_srt) { srt_mux.reset(mux); srt_mux_metrics.init({{ "destination", "srt" }}); + srt_metrics.init({{ "cardtype", "output" }}); + global_metrics.add("srt_num_connection_attempts", {{ "cardtype", "output" }}, &metric_srt_num_connection_attempts); } else { http_mux.reset(mux); http_mux_metrics.init({{ "destination", "http" }}); @@ -323,7 +350,7 @@ int VideoEncoder::open_srt_socket() return -1; } - if (srt_setsockopt(sock, 0, SRTO_LATENCY, &global_flags.srt_output_latency, sizeof(global_flags.srt_output_latency)) < 0) { + if (srt_setsockopt(sock, 0, SRTO_LATENCY, &global_flags.srt_output_latency_ms, sizeof(global_flags.srt_output_latency_ms)) < 0) { fprintf(stderr, "srt_setsockopt(SRTO_LATENCY): %s\n", srt_getlasterror_str()); srt_close(sock); return -1; @@ -364,6 +391,8 @@ int VideoEncoder::connect_to_srt() return -1; } + unique_ptr ai_cleanup(ai, &freeaddrinfo); + for (const addrinfo *cur = ai; cur != nullptr; cur = cur->ai_next) { // Seemingly, srt_create_socket() isn't universal; once we try to connect, // it gets locked to either IPv4 or IPv6. So we need to create a new one @@ -373,36 +402,128 @@ int VideoEncoder::connect_to_srt() // Die immediately. return sock; } + ++metric_srt_num_connection_attempts; + + // We do a non-blocking connect, so that we can check should_quit + // every now and then. + int blocking = 0; + if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &blocking, sizeof(blocking)) < 0) { + fprintf(stderr, "srt_setsockopt(SRTO_SNDSYN=0): %s\n", srt_getlasterror_str()); + srt_close(sock); + continue; + } if (srt_connect(sock, cur->ai_addr, cur->ai_addrlen) < 0) { fprintf(stderr, "srt_connect(%s): %s\n", print_addrinfo(cur).c_str(), srt_getlasterror_str()); srt_close(sock); continue; } - fprintf(stderr, "Connected to destination SRT endpoint at %s.\n", print_addrinfo(cur).c_str()); - freeaddrinfo(ai); - return sock; + int eid = srt_epoll_create(); + if (eid < 0) { + fprintf(stderr, "srt_epoll_create(): %s\n", srt_getlasterror_str()); + srt_close(sock); + continue; + } + int modes = SRT_EPOLL_ERR | SRT_EPOLL_OUT; + if (srt_epoll_add_usock(eid, sock, &modes) < 0) { + fprintf(stderr, "srt_epoll_usock(): %s\n", srt_getlasterror_str()); + srt_close(sock); + srt_epoll_release(eid); + continue; + } + bool ok; + while (!should_quit.load()) { + SRTSOCKET errfds[1], writefds[1]; + int num_errfds = 1, num_writefds = 1; + int poll_time_ms = 100; + int ret = srt_epoll_wait(eid, errfds, &num_errfds, writefds, &num_writefds, poll_time_ms, 0, 0, 0, 0); + if (ret < 0) { + if (srt_getlasterror(nullptr) == SRT_ETIMEOUT) { + continue; + } else { + fprintf(stderr, "srt_epoll_wait(): %s\n", srt_getlasterror_str()); + srt_close(sock); + srt_epoll_release(eid); + return -1; + } + } else if (ret > 0) { + // The SRT epoll framework is pretty odd, but seemingly, + // this is the way. Getting the same error code as srt_connect() + // would normally return seems to be impossible, though. + ok = (num_errfds == 0); + break; + fprintf(stderr, "num_errfds=%d num_writefds=%d last_err=%s\n", num_errfds, num_writefds, srt_getlasterror_str()); + break; + } + } + srt_epoll_release(eid); + if (should_quit.load()) { + srt_close(sock); + return -1; + } + if (ok) { + fprintf(stderr, "Connected to destination SRT endpoint at %s.\n", print_addrinfo(cur).c_str()); + return sock; + } else { + fprintf(stderr, "srt_connect(%s): %s\n", print_addrinfo(cur).c_str(), srt_getlasterror_str()); + srt_close(sock); + } } // Out of candidates, so give up. - freeaddrinfo(ai); return -1; } int VideoEncoder::write_srt_packet(uint8_t *buf, int buf_size) { - while (buf_size > 0) { + if (want_srt_metric_update.exchange(false) && srt_sock != -1) { + srt_metrics.update_srt_stats(srt_sock); + } + + bool has_drained = false; + bool trying_reconnect = false; + steady_clock::time_point first_connect_start; + + while (buf_size > 0 && !should_quit.load()) { if (srt_sock == -1) { + if (!trying_reconnect) { + first_connect_start = steady_clock::now(); + trying_reconnect = true; + } srt_sock = connect_to_srt(); if (srt_sock == -1) { usleep(100000); + if (!has_drained && duration(steady_clock::now() - first_connect_start).count() >= global_flags.srt_output_latency_ms * 1e-3) { + // The entire concept for SRT is to have fixed, low latency. + // If we've been out for more than a latency period, we shouldn't + // try to send the entire backlog. (But we should be tolerant + // of a quick disconnect and reconnect.) Maybe it would be better + // to have a sliding window of how much we remove, but it quickly + // starts getting esoteric, so juts drop it all. + fprintf(stderr, "WARNING: No SRT connection for more than %d ms, dropping data.\n", + global_flags.srt_output_latency_ms); + srt_mux->drain(); + has_drained = true; + } continue; } + srt_metrics.update_srt_stats(srt_sock); + } + if (has_drained) { + // Now that we're reconnected, we can start accepting data again, + // but discard the rest of this write (it is very old by now). + srt_mux->undrain(); + break; } int to_send = min(buf_size, SRT_LIVE_DEF_PLSIZE); int ret = srt_send(srt_sock, (char *)buf, to_send); if (ret < 0) { fprintf(stderr, "srt_send(): %s\n", srt_getlasterror_str()); srt_close(srt_sock); + srt_metrics.metric_srt_uptime_seconds = 0.0 / 0.0; + if (!trying_reconnect) { + first_connect_start = steady_clock::now(); + trying_reconnect = true; + } srt_sock = connect_to_srt(); continue; }