X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=nageru%2Fmixer.cpp;h=3b7a99c0319ea567a0f5fcb38f17075ef9d7151d;hb=82eaaa3d91b33479a3dee767d5de12aeb6da5b93;hp=953fd8133db59410ab2973d7592a05e5d133bc09;hpb=eeda8995329601f9f4e35047358400833eeae68e;p=nageru diff --git a/nageru/mixer.cpp b/nageru/mixer.cpp index 953fd81..3b7a99c 100644 --- a/nageru/mixer.cpp +++ b/nageru/mixer.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -44,8 +45,10 @@ #include "shared/disk_space_estimator.h" #include "ffmpeg_capture.h" #include "flags.h" +#include "image_input.h" #include "input_mapping.h" #include "shared/metrics.h" +#include "shared/va_display.h" #include "mjpeg_encoder.h" #include "pbo_frame_allocator.h" #include "shared/ref_counted_gl_sync.h" @@ -53,13 +56,18 @@ #include "shared/timebase.h" #include "timecode_renderer.h" #include "v210_converter.h" -#include "va_display_with_cleanup.h" #include "video_encoder.h" #undef Status #include #include "json.pb.h" +#ifdef HAVE_SRT +// Must come after CEF, since it includes , which has #defines +// that conflict with CEF logging constants. +#include +#endif + class IDeckLink; class QOpenGLContext; @@ -108,11 +116,17 @@ void ensure_texture_resolution(PBOFrameAllocator::Userdata *userdata, unsigned f assert(false); } - if (first || - width != userdata->last_width[field] || - height != userdata->last_height[field] || - cbcr_width != userdata->last_cbcr_width[field] || - cbcr_height != userdata->last_cbcr_height[field]) { + const bool recreate_main_texture = + first || + width != userdata->last_width[field] || + height != userdata->last_height[field] || + cbcr_width != userdata->last_cbcr_width[field] || + cbcr_height != userdata->last_cbcr_height[field]; + const bool recreate_v210_texture = + global_flags.ten_bit_input && + (first || v210_width != userdata->last_v210_width[field] || height != userdata->last_height[field]); + + if (recreate_main_texture) { // We changed resolution since last use of this texture, so we need to create // a new object. Note that this each card has its own PBOFrameAllocator, // we don't need to worry about these flip-flopping between resolutions. @@ -152,11 +166,8 @@ void ensure_texture_resolution(PBOFrameAllocator::Userdata *userdata, unsigned f case PixelFormat_8BitBGRA: glBindTexture(GL_TEXTURE_2D, userdata->tex_rgba[field]); check_error(); - if (global_flags.can_disable_srgb_decoder) { // See the comments in tweaked_inputs.h. - glTexImage2D(GL_TEXTURE_2D, 0, GL_SRGB8_ALPHA8, width, height, 0, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, nullptr); - } else { - glTexImage2D(GL_TEXTURE_2D, 0, GL_RGBA8, width, height, 0, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, nullptr); - } + // NOTE: sRGB may be disabled by sRGBSwitchingFlatInput. + glTexImage2D(GL_TEXTURE_2D, 0, GL_SRGB8_ALPHA8, width, height, 0, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, nullptr); check_error(); break; default: @@ -167,14 +178,14 @@ void ensure_texture_resolution(PBOFrameAllocator::Userdata *userdata, unsigned f userdata->last_cbcr_width[field] = cbcr_width; userdata->last_cbcr_height[field] = cbcr_height; } - if (global_flags.ten_bit_input && - (first || v210_width != userdata->last_v210_width[field])) { + if (recreate_v210_texture) { // Same as above; we need to recreate the texture. glBindTexture(GL_TEXTURE_2D, userdata->tex_v210[field]); check_error(); glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB10_A2, v210_width, height, 0, GL_RGBA, GL_UNSIGNED_INT_2_10_10_10_REV, nullptr); check_error(); userdata->last_v210_width[field] = v210_width; + userdata->last_height[field] = height; } } @@ -301,21 +312,22 @@ void QueueLengthPolicy::update_policy(steady_clock::time_point now, metric_input_queue_safe_length_frames = safe_queue_length; } -Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) +Mixer::Mixer(const QSurfaceFormat &format) : httpd(), - num_cards(num_cards), mixer_surface(create_surface(format)), h264_encoder_surface(create_surface(format)), - decklink_output_surface(create_surface(format)) + decklink_output_surface(create_surface(format)), + image_update_surface(create_surface(format)) { memcpy(ycbcr_interpretation, global_flags.ycbcr_interpretation, sizeof(ycbcr_interpretation)); CHECK(init_movit(MOVIT_SHADER_DIR, MOVIT_DEBUG_OFF)); check_error(); - // This nearly always should be true. - global_flags.can_disable_srgb_decoder = - epoxy_has_gl_extension("GL_EXT_texture_sRGB_decode") && - epoxy_has_gl_extension("GL_ARB_sampler_objects"); + if (!epoxy_has_gl_extension("GL_EXT_texture_sRGB_decode") || + !epoxy_has_gl_extension("GL_ARB_sampler_objects")) { + fprintf(stderr, "Nageru requires GL_EXT_texture_sRGB_decode and GL_ARB_sampler_objects to run.\n"); + exit(1); + } // Since we allow non-bouncing 4:2:2 YCbCrInputs, effective subpixel precision // will be halved when sampling them, and we need to compensate here. @@ -347,6 +359,11 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) ycbcr_format.cb_y_position = 0.5f; ycbcr_format.cr_y_position = 0.5f; + // Initialize the neutral colors to sane values. + for (unsigned i = 0; i < MAX_VIDEO_CARDS; ++i) { + last_received_neutral_color[i] = RGBTriplet(1.0f, 1.0f, 1.0f); + } + // Display chain; shows the live output produced by the main chain (or rather, a copy of it). display_chain.reset(new EffectChain(global_flags.width, global_flags.height, resource_pool.get())); check_error(); @@ -358,20 +375,22 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) display_chain->finalize(); video_encoder.reset(new VideoEncoder(resource_pool.get(), h264_encoder_surface, global_flags.va_display, global_flags.width, global_flags.height, &httpd, global_disk_space_estimator)); - mjpeg_encoder.reset(new MJPEGEncoder(&httpd, global_flags.va_display)); + if (!global_flags.card_to_mjpeg_stream_export.empty()) { + mjpeg_encoder.reset(new MJPEGEncoder(&httpd, global_flags.va_display)); + } // Must be instantiated after VideoEncoder has initialized global_flags.use_zerocopy. - theme.reset(new Theme(global_flags.theme_filename, global_flags.theme_dirs, resource_pool.get(), num_cards)); + theme.reset(new Theme(global_flags.theme_filename, global_flags.theme_dirs, resource_pool.get())); // Must be instantiated after the theme, as the theme decides the number of FFmpeg inputs. std::vector video_inputs = theme->get_video_inputs(); - audio_mixer.reset(new AudioMixer(num_cards, video_inputs.size())); + audio_mixer.reset(new AudioMixer); httpd.add_endpoint("/channels", bind(&Mixer::get_channels_json, this), HTTPD::ALLOW_ALL_ORIGINS); - for (int channel_idx = 2; channel_idx < theme->get_num_channels(); ++channel_idx) { + for (int channel_idx = 0; channel_idx < theme->get_num_channels(); ++channel_idx) { char url[256]; - snprintf(url, sizeof(url), "/channels/%d/color", channel_idx); - httpd.add_endpoint(url, bind(&Mixer::get_channel_color_http, this, unsigned(channel_idx)), HTTPD::ALLOW_ALL_ORIGINS); + snprintf(url, sizeof(url), "/channels/%d/color", channel_idx + 2); + httpd.add_endpoint(url, bind(&Mixer::get_channel_color_http, this, unsigned(channel_idx + 2)), HTTPD::ALLOW_ALL_ORIGINS); } // Start listening for clients only once VideoEncoder has written its header, if any. @@ -385,7 +404,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) { IDeckLinkIterator *decklink_iterator = CreateDeckLinkIteratorInstance(); if (decklink_iterator != nullptr) { - for ( ; card_index < num_cards; ++card_index) { + for ( ; card_index < unsigned(global_flags.max_num_cards); ++card_index) { IDeckLink *decklink; if (decklink_iterator->Next(&decklink) != S_OK) { break; @@ -397,7 +416,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) delete output; output = nullptr; } - configure_card(card_index, capture, CardType::LIVE_CARD, output); + configure_card(card_index, capture, CardType::LIVE_CARD, output, /*is_srt_card=*/false); ++num_pci_devices; } decklink_iterator->Release(); @@ -408,31 +427,44 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) } unsigned num_usb_devices = BMUSBCapture::num_cards(); - for (unsigned usb_card_index = 0; usb_card_index < num_usb_devices && card_index < num_cards; ++usb_card_index, ++card_index) { + for (unsigned usb_card_index = 0; usb_card_index < num_usb_devices && card_index < unsigned(global_flags.max_num_cards); ++usb_card_index, ++card_index) { BMUSBCapture *capture = new BMUSBCapture(usb_card_index); capture->set_card_disconnected_callback(bind(&Mixer::bm_hotplug_remove, this, card_index)); - configure_card(card_index, capture, CardType::LIVE_CARD, /*output=*/nullptr); + configure_card(card_index, capture, CardType::LIVE_CARD, /*output=*/nullptr, /*is_srt_card=*/false); } fprintf(stderr, "Found %u USB card(s).\n", num_usb_devices); + // Fill up with fake cards for as long as we can, so that the FFmpeg + // and HTML cards always come last. unsigned num_fake_cards = 0; - for ( ; card_index < num_cards; ++card_index, ++num_fake_cards) { - FakeCapture *capture = new FakeCapture(global_flags.width, global_flags.height, FAKE_FPS, OUTPUT_FREQUENCY, card_index, global_flags.fake_cards_audio); - configure_card(card_index, capture, CardType::FAKE_CAPTURE, /*output=*/nullptr); +#ifdef HAVE_CEF + size_t num_html_inputs = theme->get_html_inputs().size(); +#else + size_t num_html_inputs = 0; +#endif + for ( ; card_index < MAX_VIDEO_CARDS - video_inputs.size() - num_html_inputs; ++card_index) { + // Only bother to activate fake capture cards to satisfy the minimum. + bool is_active = card_index < unsigned(global_flags.min_num_cards) || cards[card_index].force_active; + if (is_active) { + FakeCapture *capture = new FakeCapture(global_flags.width, global_flags.height, FAKE_FPS, OUTPUT_FREQUENCY, card_index, global_flags.fake_cards_audio); + configure_card(card_index, capture, CardType::FAKE_CAPTURE, /*output=*/nullptr, /*is_srt_card=*/false); + ++num_fake_cards; + } else { + configure_card(card_index, nullptr, CardType::FAKE_CAPTURE, /*output=*/nullptr, /*is_srt_card=*/false); + } } if (num_fake_cards > 0) { fprintf(stderr, "Initialized %u fake cards.\n", num_fake_cards); } - // Initialize all video inputs the theme asked for. Note that these are - // all put _after_ the regular cards, which stop at - 1. + // Initialize all video inputs the theme asked for. for (unsigned video_card_index = 0; video_card_index < video_inputs.size(); ++card_index, ++video_card_index) { if (card_index >= MAX_VIDEO_CARDS) { fprintf(stderr, "ERROR: Not enough card slots available for the videos the theme requested.\n"); - exit(1); + abort(); } - configure_card(card_index, video_inputs[video_card_index], CardType::FFMPEG_INPUT, /*output=*/nullptr); + configure_card(card_index, video_inputs[video_card_index], CardType::FFMPEG_INPUT, /*output=*/nullptr, /*is_srt_card=*/false); video_inputs[video_card_index]->set_card_index(card_index); } num_video_inputs = video_inputs.size(); @@ -443,9 +475,9 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) for (unsigned html_card_index = 0; html_card_index < html_inputs.size(); ++card_index, ++html_card_index) { if (card_index >= MAX_VIDEO_CARDS) { fprintf(stderr, "ERROR: Not enough card slots available for the HTML inputs the theme requested.\n"); - exit(1); + abort(); } - configure_card(card_index, html_inputs[html_card_index], CardType::CEF_INPUT, /*output=*/nullptr); + configure_card(card_index, html_inputs[html_card_index], CardType::CEF_INPUT, /*output=*/nullptr, /*is_srt_card=*/false); html_inputs[html_card_index]->set_card_index(card_index); } num_html_inputs = html_inputs.size(); @@ -454,7 +486,13 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) BMUSBCapture::set_card_connected_callback(bind(&Mixer::bm_hotplug_add, this, _1)); BMUSBCapture::start_bm_thread(); - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { +#ifdef HAVE_SRT + if (global_flags.srt_port >= 0) { + start_srt(); + } +#endif + + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { cards[card_index].queue_length_policy.reset(card_index); } @@ -464,7 +502,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) if (!v210Converter::has_hardware_support()) { fprintf(stderr, "ERROR: --ten-bit-input requires support for OpenGL compute shaders\n"); fprintf(stderr, " (OpenGL 4.3, or GL_ARB_compute_shader + GL_ARB_shader_image_load_store).\n"); - exit(1); + abort(); } v210_converter.reset(new v210Converter()); @@ -481,7 +519,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) if (!v210Converter::has_hardware_support()) { fprintf(stderr, "ERROR: --ten-bit-output requires support for OpenGL compute shaders\n"); fprintf(stderr, " (OpenGL 4.3, or GL_ARB_compute_shader + GL_ARB_shader_image_load_store).\n"); - exit(1); + abort(); } } @@ -498,16 +536,24 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) } output_jitter_history.register_metrics({{ "card", "output" }}); + + ImageInput::start_update_thread(image_update_surface); } Mixer::~Mixer() { - mjpeg_encoder->stop(); + ImageInput::end_update_thread(); + + if (mjpeg_encoder != nullptr) { + mjpeg_encoder->stop(); + } httpd.stop(); BMUSBCapture::stop_bm_thread(); - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { - cards[card_index].capture->stop_dequeue_thread(); + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { + if (cards[card_index].capture != nullptr) { // Active. + cards[card_index].capture->stop_dequeue_thread(); + } if (cards[card_index].output) { cards[card_index].output->end_output(); cards[card_index].output.reset(); @@ -517,9 +563,14 @@ Mixer::~Mixer() video_encoder.reset(nullptr); } -void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardType card_type, DeckLinkOutput *output) +void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardType card_type, DeckLinkOutput *output, bool is_srt_card) { - printf("Configuring card %d...\n", card_index); + bool is_active = capture != nullptr; + if (is_active) { + printf("Configuring card %d...\n", card_index); + } else { + assert(card_type == CardType::FAKE_CAPTURE); + } CaptureCard *card = &cards[card_index]; if (card->capture != nullptr) { @@ -527,6 +578,9 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT } card->capture.reset(capture); card->is_fake_capture = (card_type == CardType::FAKE_CAPTURE); + if (card->is_fake_capture) { + card->fake_capture_counter = fake_capture_counter++; + } card->is_cef_capture = (card_type == CardType::CEF_INPUT); card->may_have_dropped_last_frame = false; card->type = card_type; @@ -545,29 +599,48 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT pixel_format = PixelFormat_8BitYCbCr; } - card->capture->set_frame_callback(bind(&Mixer::bm_frame, this, card_index, _1, _2, _3, _4, _5, _6, _7)); - if (card->frame_allocator == nullptr) { - card->frame_allocator.reset(new PBOFrameAllocator(pixel_format, 8 << 20, global_flags.width, global_flags.height)); // 8 MB. - } - card->capture->set_video_frame_allocator(card->frame_allocator.get()); - if (card->surface == nullptr) { - card->surface = create_surface_with_same_format(mixer_surface); + if (is_active) { + card->capture->set_frame_callback(bind(&Mixer::bm_frame, this, card_index, _1, _2, _3, _4, _5, _6, _7)); + if (card->frame_allocator == nullptr) { + card->frame_allocator.reset(new PBOFrameAllocator(pixel_format, 8 << 20, global_flags.width, global_flags.height, card_index, mjpeg_encoder.get())); // 8 MB. + } else { + // The format could have changed, but we cannot reset the allocator + // and create a new one from scratch, since there may be allocated + // frames from it that expect to call release_frame() on it. + // Instead, ask the allocator to create new frames for us and discard + // any old ones as they come back. This takes the mutex while + // allocating, but nothing should really be sending frames in there + // right now anyway (start_bm_capture() has not been called yet). + card->frame_allocator->reconfigure(pixel_format, 8 << 20, global_flags.width, global_flags.height, card_index, mjpeg_encoder.get()); + } + card->capture->set_video_frame_allocator(card->frame_allocator.get()); + if (card->surface == nullptr) { + card->surface = create_surface_with_same_format(mixer_surface); + } + while (!card->new_frames.empty()) card->new_frames.pop_front(); + card->last_timecode = -1; + card->capture->set_pixel_format(pixel_format); + card->capture->configure_card(); + + // NOTE: start_bm_capture() happens in thread_func(). } - while (!card->new_frames.empty()) card->new_frames.pop_front(); - card->last_timecode = -1; - card->capture->set_pixel_format(pixel_format); - card->capture->configure_card(); - // NOTE: start_bm_capture() happens in thread_func(). + if (is_srt_card) { + assert(card_type == CardType::FFMPEG_INPUT); + } DeviceSpec device; - if (card_type == CardType::FFMPEG_INPUT) { - device = DeviceSpec{InputSourceType::FFMPEG_VIDEO_INPUT, card_index - num_cards}; + device = DeviceSpec{InputSourceType::CAPTURE_CARD, card_index}; + audio_mixer->reset_resampler(device); + unsigned num_channels = card_type == CardType::LIVE_CARD ? 8 : 2; + if (is_active) { + audio_mixer->set_device_parameters(device, card->capture->get_description(), card_type, num_channels, /*active=*/true); } else { - device = DeviceSpec{InputSourceType::CAPTURE_CARD, card_index}; + // Note: Keeps the previous name, if any. + char name[32]; + snprintf(name, sizeof(name), "Fake card %u", card_index + 1); + audio_mixer->set_device_parameters(device, name, card_type, num_channels, /*active=*/false); } - audio_mixer->reset_resampler(device); - audio_mixer->set_display_name(device, card->capture->get_description()); audio_mixer->trigger_state_changed_callback(); // Unregister old metrics, if any. @@ -575,63 +648,204 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT const vector> &labels = card->labels; card->jitter_history.unregister_metrics(labels); card->queue_length_policy.unregister_metrics(labels); - global_metrics.remove("input_received_frames", labels); - global_metrics.remove("input_dropped_frames_jitter", labels); - global_metrics.remove("input_dropped_frames_error", labels); - global_metrics.remove("input_dropped_frames_resets", labels); - global_metrics.remove("input_queue_length_frames", labels); - global_metrics.remove("input_queue_duped_frames", labels); - - global_metrics.remove("input_has_signal_bool", labels); - global_metrics.remove("input_is_connected_bool", labels); - global_metrics.remove("input_interlaced_bool", labels); - global_metrics.remove("input_width_pixels", labels); - global_metrics.remove("input_height_pixels", labels); - global_metrics.remove("input_frame_rate_nom", labels); - global_metrics.remove("input_frame_rate_den", labels); - global_metrics.remove("input_sample_rate_hz", labels); - } - - // Register metrics. - vector> labels; - char card_name[64]; - snprintf(card_name, sizeof(card_name), "%d", card_index); - labels.emplace_back("card", card_name); - - switch (card_type) { - case CardType::LIVE_CARD: - labels.emplace_back("cardtype", "live"); - break; - case CardType::FAKE_CAPTURE: - labels.emplace_back("cardtype", "fake"); - break; - case CardType::FFMPEG_INPUT: - labels.emplace_back("cardtype", "ffmpeg"); - break; - case CardType::CEF_INPUT: - labels.emplace_back("cardtype", "cef"); - break; - default: - assert(false); + global_metrics.remove_if_exists("input_received_frames", labels); + global_metrics.remove_if_exists("input_dropped_frames_jitter", labels); + global_metrics.remove_if_exists("input_dropped_frames_error", labels); + global_metrics.remove_if_exists("input_dropped_frames_resets", labels); + global_metrics.remove_if_exists("input_queue_length_frames", labels); + global_metrics.remove_if_exists("input_queue_duped_frames", labels); + + global_metrics.remove_if_exists("input_has_signal_bool", labels); + global_metrics.remove_if_exists("input_is_connected_bool", labels); + global_metrics.remove_if_exists("input_interlaced_bool", labels); + global_metrics.remove_if_exists("input_width_pixels", labels); + global_metrics.remove_if_exists("input_height_pixels", labels); + global_metrics.remove_if_exists("input_frame_rate_nom", labels); + global_metrics.remove_if_exists("input_frame_rate_den", labels); + global_metrics.remove_if_exists("input_sample_rate_hz", labels); + + // SRT metrics. + + // Global measurements (counters). + global_metrics.remove_if_exists("srt_uptime_seconds", labels); + global_metrics.remove_if_exists("srt_send_duration_seconds", labels); + global_metrics.remove_if_exists("srt_sent_bytes", labels); + global_metrics.remove_if_exists("srt_received_bytes", labels); + + vector> packet_labels = card->labels; + packet_labels.emplace_back("type", "normal"); + global_metrics.remove_if_exists("srt_sent_packets", packet_labels); + global_metrics.remove_if_exists("srt_received_packets", packet_labels); + + packet_labels.back().second = "lost"; + global_metrics.remove_if_exists("srt_sent_packets", packet_labels); + global_metrics.remove_if_exists("srt_received_packets", packet_labels); + + packet_labels.back().second = "retransmitted"; + global_metrics.remove_if_exists("srt_sent_packets", packet_labels); + global_metrics.remove_if_exists("srt_sent_bytes", packet_labels); + + packet_labels.back().second = "ack"; + global_metrics.remove_if_exists("srt_sent_packets", packet_labels); + global_metrics.remove_if_exists("srt_received_packets", packet_labels); + + packet_labels.back().second = "nak"; + global_metrics.remove_if_exists("srt_sent_packets", packet_labels); + global_metrics.remove_if_exists("srt_received_packets", packet_labels); + + packet_labels.back().second = "dropped"; + global_metrics.remove_if_exists("srt_sent_packets", packet_labels); + global_metrics.remove_if_exists("srt_received_packets", packet_labels); + global_metrics.remove_if_exists("srt_sent_bytes", packet_labels); + global_metrics.remove_if_exists("srt_received_bytes", packet_labels); + + packet_labels.back().second = "undecryptable"; + global_metrics.remove_if_exists("srt_received_packets", packet_labels); + global_metrics.remove_if_exists("srt_received_bytes", packet_labels); + + global_metrics.remove_if_exists("srt_filter_sent_extra_packets", labels); + global_metrics.remove_if_exists("srt_filter_received_extra_packets", labels); + global_metrics.remove_if_exists("srt_filter_received_rebuilt_packets", labels); + global_metrics.remove_if_exists("srt_filter_received_lost_packets", labels); + + // Instant measurements (gauges). + global_metrics.remove_if_exists("srt_packet_sending_period_seconds", labels); + global_metrics.remove_if_exists("srt_flow_window_packets", labels); + global_metrics.remove_if_exists("srt_congestion_window_packets", labels); + global_metrics.remove_if_exists("srt_flight_size_packets", labels); + global_metrics.remove_if_exists("srt_rtt_seconds", labels); + global_metrics.remove_if_exists("srt_estimated_bandwidth_bits_per_second", labels); + global_metrics.remove_if_exists("srt_bandwidth_ceiling_bits_per_second", labels); + global_metrics.remove_if_exists("srt_send_buffer_available_bytes", labels); + global_metrics.remove_if_exists("srt_receive_buffer_available_bytes", labels); + global_metrics.remove_if_exists("srt_mss_bytes", labels); + + global_metrics.remove_if_exists("srt_sender_unacked_packets", labels); + global_metrics.remove_if_exists("srt_sender_unacked_bytes", labels); + global_metrics.remove_if_exists("srt_sender_unacked_timespan_seconds", labels); + global_metrics.remove_if_exists("srt_sender_delivery_delay_seconds", labels); + + global_metrics.remove_if_exists("srt_receiver_unacked_packets", labels); + global_metrics.remove_if_exists("srt_receiver_unacked_bytes", labels); + global_metrics.remove_if_exists("srt_receiver_unacked_timespan_seconds", labels); + global_metrics.remove_if_exists("srt_receiver_delivery_delay_seconds", labels); + } + + if (is_active) { + // Register metrics. + vector> labels; + char card_name[64]; + snprintf(card_name, sizeof(card_name), "%d", card_index); + labels.emplace_back("card", card_name); + + switch (card_type) { + case CardType::LIVE_CARD: + labels.emplace_back("cardtype", "live"); + break; + case CardType::FAKE_CAPTURE: + labels.emplace_back("cardtype", "fake"); + break; + case CardType::FFMPEG_INPUT: + if (is_srt_card) { + labels.emplace_back("cardtype", "srt"); + } else { + labels.emplace_back("cardtype", "ffmpeg"); + } + break; + case CardType::CEF_INPUT: + labels.emplace_back("cardtype", "cef"); + break; + default: + assert(false); + } + card->jitter_history.register_metrics(labels); + card->queue_length_policy.register_metrics(labels); + global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames); + global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter); + global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error); + global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets); + global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE); + global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames); + + global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE); + global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE); + global_metrics.add("input_interlaced_bool", labels, &card->metric_input_interlaced_bool, Metrics::TYPE_GAUGE); + global_metrics.add("input_width_pixels", labels, &card->metric_input_width_pixels, Metrics::TYPE_GAUGE); + global_metrics.add("input_height_pixels", labels, &card->metric_input_height_pixels, Metrics::TYPE_GAUGE); + global_metrics.add("input_frame_rate_nom", labels, &card->metric_input_frame_rate_nom, Metrics::TYPE_GAUGE); + global_metrics.add("input_frame_rate_den", labels, &card->metric_input_frame_rate_den, Metrics::TYPE_GAUGE); + global_metrics.add("input_sample_rate_hz", labels, &card->metric_input_sample_rate_hz, Metrics::TYPE_GAUGE); + + if (is_srt_card) { + // Global measurements (counters). + global_metrics.add("srt_uptime_seconds", labels, &card->metric_srt_uptime_seconds); + global_metrics.add("srt_send_duration_seconds", labels, &card->metric_srt_send_duration_seconds); + global_metrics.add("srt_sent_bytes", labels, &card->metric_srt_sent_bytes); + global_metrics.add("srt_received_bytes", labels, &card->metric_srt_received_bytes); + + vector> packet_labels = labels; + packet_labels.emplace_back("type", "normal"); + global_metrics.add("srt_sent_packets", packet_labels, &card->metric_srt_sent_packets_normal); + global_metrics.add("srt_received_packets", packet_labels, &card->metric_srt_received_packets_normal); + + packet_labels.back().second = "lost"; + global_metrics.add("srt_sent_packets", packet_labels, &card->metric_srt_sent_packets_lost); + global_metrics.add("srt_received_packets", packet_labels, &card->metric_srt_received_packets_lost); + + packet_labels.back().second = "retransmitted"; + global_metrics.add("srt_sent_packets", packet_labels, &card->metric_srt_sent_packets_retransmitted); + global_metrics.add("srt_sent_bytes", packet_labels, &card->metric_srt_sent_bytes_retransmitted); + + packet_labels.back().second = "ack"; + global_metrics.add("srt_sent_packets", packet_labels, &card->metric_srt_sent_packets_ack); + global_metrics.add("srt_received_packets", packet_labels, &card->metric_srt_received_packets_ack); + + packet_labels.back().second = "nak"; + global_metrics.add("srt_sent_packets", packet_labels, &card->metric_srt_sent_packets_nak); + global_metrics.add("srt_received_packets", packet_labels, &card->metric_srt_received_packets_nak); + + packet_labels.back().second = "dropped"; + global_metrics.add("srt_sent_packets", packet_labels, &card->metric_srt_sent_packets_dropped); + global_metrics.add("srt_received_packets", packet_labels, &card->metric_srt_received_packets_dropped); + global_metrics.add("srt_sent_bytes", packet_labels, &card->metric_srt_sent_bytes_dropped); + global_metrics.add("srt_received_bytes", packet_labels, &card->metric_srt_received_bytes_dropped); + + packet_labels.back().second = "undecryptable"; + global_metrics.add("srt_received_packets", packet_labels, &card->metric_srt_received_packets_undecryptable); + global_metrics.add("srt_received_bytes", packet_labels, &card->metric_srt_received_bytes_undecryptable); + + global_metrics.add("srt_filter_sent_extra_packets", labels, &card->metric_srt_filter_sent_packets); + global_metrics.add("srt_filter_received_extra_packets", labels, &card->metric_srt_filter_received_extra_packets); + global_metrics.add("srt_filter_received_rebuilt_packets", labels, &card->metric_srt_filter_received_rebuilt_packets); + global_metrics.add("srt_filter_received_lost_packets", labels, &card->metric_srt_filter_received_lost_packets); + + // Instant measurements (gauges). + global_metrics.add("srt_packet_sending_period_seconds", labels, &card->metric_srt_packet_sending_period_seconds, Metrics::TYPE_GAUGE); + global_metrics.add("srt_flow_window_packets", labels, &card->metric_srt_flow_window_packets, Metrics::TYPE_GAUGE); + global_metrics.add("srt_congestion_window_packets", labels, &card->metric_srt_congestion_window_packets, Metrics::TYPE_GAUGE); + global_metrics.add("srt_flight_size_packets", labels, &card->metric_srt_flight_size_packets, Metrics::TYPE_GAUGE); + global_metrics.add("srt_rtt_seconds", labels, &card->metric_srt_rtt_seconds, Metrics::TYPE_GAUGE); + global_metrics.add("srt_estimated_bandwidth_bits_per_second", labels, &card->metric_srt_estimated_bandwidth_bits_per_second, Metrics::TYPE_GAUGE); + global_metrics.add("srt_bandwidth_ceiling_bits_per_second", labels, &card->metric_srt_bandwidth_ceiling_bits_per_second, Metrics::TYPE_GAUGE); + global_metrics.add("srt_send_buffer_available_bytes", labels, &card->metric_srt_send_buffer_available_bytes, Metrics::TYPE_GAUGE); + global_metrics.add("srt_receive_buffer_available_bytes", labels, &card->metric_srt_receive_buffer_available_bytes, Metrics::TYPE_GAUGE); + global_metrics.add("srt_mss_bytes", labels, &card->metric_srt_mss_bytes, Metrics::TYPE_GAUGE); + + global_metrics.add("srt_sender_unacked_packets", labels, &card->metric_srt_sender_unacked_packets, Metrics::TYPE_GAUGE); + global_metrics.add("srt_sender_unacked_bytes", labels, &card->metric_srt_sender_unacked_bytes, Metrics::TYPE_GAUGE); + global_metrics.add("srt_sender_unacked_timespan_seconds", labels, &card->metric_srt_sender_unacked_timespan_seconds, Metrics::TYPE_GAUGE); + global_metrics.add("srt_sender_delivery_delay_seconds", labels, &card->metric_srt_sender_delivery_delay_seconds, Metrics::TYPE_GAUGE); + + global_metrics.add("srt_receiver_unacked_packets", labels, &card->metric_srt_receiver_unacked_packets, Metrics::TYPE_GAUGE); + global_metrics.add("srt_receiver_unacked_bytes", labels, &card->metric_srt_receiver_unacked_bytes, Metrics::TYPE_GAUGE); + global_metrics.add("srt_receiver_unacked_timespan_seconds", labels, &card->metric_srt_receiver_unacked_timespan_seconds, Metrics::TYPE_GAUGE); + global_metrics.add("srt_receiver_delivery_delay_seconds", labels, &card->metric_srt_receiver_delivery_delay_seconds, Metrics::TYPE_GAUGE); + } + + card->labels = labels; + } else { + card->labels.clear(); } - card->jitter_history.register_metrics(labels); - card->queue_length_policy.register_metrics(labels); - global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames); - global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter); - global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error); - global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets); - global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE); - global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames); - - global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE); - global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE); - global_metrics.add("input_interlaced_bool", labels, &card->metric_input_interlaced_bool, Metrics::TYPE_GAUGE); - global_metrics.add("input_width_pixels", labels, &card->metric_input_width_pixels, Metrics::TYPE_GAUGE); - global_metrics.add("input_height_pixels", labels, &card->metric_input_height_pixels, Metrics::TYPE_GAUGE); - global_metrics.add("input_frame_rate_nom", labels, &card->metric_input_frame_rate_nom, Metrics::TYPE_GAUGE); - global_metrics.add("input_frame_rate_den", labels, &card->metric_input_frame_rate_den, Metrics::TYPE_GAUGE); - global_metrics.add("input_sample_rate_hz", labels, &card->metric_input_sample_rate_hz, Metrics::TYPE_GAUGE); - card->labels = labels; } void Mixer::set_output_card_internal(int card_index) @@ -667,7 +881,7 @@ void Mixer::set_output_card_internal(int card_index) lock.lock(); card->parked_capture = move(card->capture); CaptureInterface *fake_capture = new FakeCapture(global_flags.width, global_flags.height, FAKE_FPS, OUTPUT_FREQUENCY, card_index, global_flags.fake_cards_audio); - configure_card(card_index, fake_capture, CardType::FAKE_CAPTURE, card->output.release()); + configure_card(card_index, fake_capture, CardType::FAKE_CAPTURE, card->output.release(), /*is_srt_card=*/false); card->queue_length_policy.reset(card_index); card->capture->start_bm_capture(); desired_output_video_mode = output_video_mode = card->output->pick_video_mode(desired_output_video_mode); @@ -689,22 +903,13 @@ int unwrap_timecode(uint16_t current_wrapped, int last) } } -DeviceSpec card_index_to_device(unsigned card_index, unsigned num_cards) -{ - if (card_index >= num_cards) { - return DeviceSpec{InputSourceType::FFMPEG_VIDEO_INPUT, card_index - num_cards}; - } else { - return DeviceSpec{InputSourceType::CAPTURE_CARD, card_index}; - } -} - } // namespace void Mixer::bm_frame(unsigned card_index, uint16_t timecode, FrameAllocator::Frame video_frame, size_t video_offset, VideoFormat video_format, FrameAllocator::Frame audio_frame, size_t audio_offset, AudioFormat audio_format) { - DeviceSpec device = card_index_to_device(card_index, num_cards); + DeviceSpec device{InputSourceType::CAPTURE_CARD, card_index}; CaptureCard *card = &cards[card_index]; ++card->metric_input_received_frames; @@ -741,7 +946,7 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, size_t num_samples = (audio_frame.len > audio_offset) ? (audio_frame.len - audio_offset) / audio_format.num_channels / (audio_format.bits_per_sample / 8) : 0; if (num_samples > OUTPUT_FREQUENCY / 10 && card->type != CardType::FFMPEG_INPUT) { printf("%s: Dropping frame with implausible audio length (len=%d, offset=%d) [timecode=0x%04x video_len=%d video_offset=%d video_format=%x)\n", - spec_to_string(device).c_str(), int(audio_frame.len), int(audio_offset), + description_for_card(card_index).c_str(), int(audio_frame.len), int(audio_offset), timecode, int(video_frame.len), int(video_offset), video_format.id); if (video_frame.owner) { video_frame.owner->release_frame(video_frame); @@ -763,24 +968,53 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, if (dropped_frames > MAX_FPS * 2) { fprintf(stderr, "%s lost more than two seconds (or time code jumping around; from 0x%04x to 0x%04x), resetting resampler\n", - spec_to_string(device).c_str(), card->last_timecode, timecode); + description_for_card(card_index).c_str(), card->last_timecode, timecode); audio_mixer->reset_resampler(device); dropped_frames = 0; ++card->metric_input_resets; } else if (dropped_frames > 0) { // Insert silence as needed. fprintf(stderr, "%s dropped %d frame(s) (before timecode 0x%04x), inserting silence.\n", - spec_to_string(device).c_str(), dropped_frames, timecode); + description_for_card(card_index).c_str(), dropped_frames, timecode); card->metric_input_dropped_frames_error += dropped_frames; bool success; do { - success = audio_mixer->add_silence(device, silence_samples, dropped_frames, frame_length); + success = audio_mixer->add_silence(device, silence_samples, dropped_frames); } while (!success); } if (num_samples > 0) { - audio_mixer->add_audio(device, audio_frame.data + audio_offset, num_samples, audio_format, frame_length, audio_frame.received_timestamp); + audio_mixer->add_audio(device, audio_frame.data + audio_offset, num_samples, audio_format, audio_frame.received_timestamp); + + // Audio for the MJPEG stream. We don't resample; audio that's not in 48 kHz + // just gets dropped for now. + // + // Only bother doing MJPEG encoding if there are any connected clients + // that want the stream. + if (httpd.get_num_connected_multicam_clients() > 0 || + httpd.get_num_connected_siphon_clients(card_index) > 0) { + vector converted_samples = convert_audio_to_fixed32(audio_frame.data + audio_offset, num_samples, audio_format, 2); + lock_guard lock(card_mutex); + if (card->new_raw_audio.empty()) { + card->new_raw_audio = move(converted_samples); + } else { + // For raw audio, we don't really synchronize audio and video; + // we just put the audio in frame by frame, and if a video frame is + // dropped, we still keep the audio, which means it will be added + // to the beginning of the next frame. It would probably be better + // to move the audio pts earlier to show this, but most players can + // live with some jitter, and in a lot of ways, it's much nicer for + // Futatabi to have all audio locked to a video frame. + card->new_raw_audio.insert(card->new_raw_audio.end(), converted_samples.begin(), converted_samples.end()); + + // Truncate to one second, just to be sure we don't have infinite buildup in case of weirdness. + if (card->new_raw_audio.size() > OUTPUT_FREQUENCY * 2) { + size_t excess_samples = card->new_raw_audio.size() - OUTPUT_FREQUENCY * 2; + card->new_raw_audio.erase(card->new_raw_audio.begin(), card->new_raw_audio.begin() + excess_samples); + } + } + } } // Done with the audio, so release it. @@ -791,8 +1025,21 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, card->last_timecode = timecode; PBOFrameAllocator::Userdata *userdata = (PBOFrameAllocator::Userdata *)video_frame.userdata; + if (card->type == CardType::FFMPEG_INPUT && userdata != nullptr) { + FFmpegCapture *ffmpeg_capture = static_cast(card->capture.get()); + userdata->has_last_subtitle = ffmpeg_capture->get_has_last_subtitle(); + userdata->last_subtitle = ffmpeg_capture->get_last_subtitle(); + } +#ifdef HAVE_SRT + if (card->type == CardType::FFMPEG_INPUT) { + int srt_sock = static_cast(card->capture.get())->get_srt_sock(); + if (srt_sock != -1) { + update_srt_stats(srt_sock, card); + } + } +#endif - size_t cbcr_width, cbcr_height, cbcr_offset, y_offset; + size_t y_offset, cbcr_offset; size_t expected_length = video_format.stride * (video_format.height + video_format.extra_lines_top + video_format.extra_lines_bottom); if (userdata != nullptr && userdata->pixel_format == PixelFormat_8BitYCbCrPlanar) { // The calculation above is wrong for planar Y'CbCr, so just override it. @@ -801,22 +1048,18 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, expected_length = video_frame.len; userdata->ycbcr_format = (static_cast(card->capture.get()))->get_current_frame_ycbcr_format(); - cbcr_width = video_format.width / userdata->ycbcr_format.chroma_subsampling_x; - cbcr_height = video_format.height / userdata->ycbcr_format.chroma_subsampling_y; - cbcr_offset = video_format.width * video_format.height; y_offset = 0; + cbcr_offset = video_format.width * video_format.height; } else { // All the other Y'CbCr formats are 4:2:2. - cbcr_width = video_format.width / 2; - cbcr_height = video_format.height; - cbcr_offset = video_offset / 2; y_offset = video_frame.size / 2 + video_offset / 2; + cbcr_offset = video_offset / 2; } if (video_frame.len - video_offset == 0 || video_frame.len - video_offset != expected_length) { if (video_frame.len != 0) { - printf("%s: Dropping video frame with wrong length (%ld; expected %ld)\n", - spec_to_string(device).c_str(), video_frame.len - video_offset, expected_length); + printf("%s: Dropping video frame with wrong length (%zu; expected %zu)\n", + description_for_card(card_index).c_str(), video_frame.len - video_offset, expected_length); } if (video_frame.owner) { video_frame.owner->release_frame(video_frame); @@ -825,7 +1068,7 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, // Still send on the information that we _had_ a frame, even though it's corrupted, // so that pts can go up accordingly. { - unique_lock lock(card_mutex); + lock_guard lock(card_mutex); CaptureCard::NewFrame new_frame; new_frame.frame = RefCountedFrame(FrameAllocator::Frame()); new_frame.length = frame_length; @@ -841,21 +1084,17 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, unsigned num_fields = video_format.interlaced ? 2 : 1; steady_clock::time_point frame_upload_start; - bool interlaced_stride = false; if (video_format.interlaced) { // Send the two fields along as separate frames; the other side will need to add // a deinterlacer to actually get this right. assert(video_format.height % 2 == 0); video_format.height /= 2; - cbcr_height /= 2; assert(frame_length % 2 == 0); frame_length /= 2; num_fields = 2; - if (video_format.second_field_start == 1) { - interlaced_stride = true; - } frame_upload_start = steady_clock::now(); } + assert(userdata != nullptr); userdata->last_interlaced = video_format.interlaced; userdata->last_has_signal = video_format.has_signal; userdata->last_is_connected = video_format.is_connected; @@ -863,79 +1102,15 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, userdata->last_frame_rate_den = video_format.frame_rate_den; RefCountedFrame frame(video_frame); - // Upload the textures. + // Send the frames on to the main thread, which will upload and process htem. + // It is entirely possible to upload them in the same thread (and it might even be + // faster, depending on the GPU and driver), but it appears to be trickling + // driver bugs very easily. + // + // Note that this means we must hold on to the actual frame data in + // until the upload is done, but we hold on to much longer than that + // (in fact, all the way until we no longer use the texture in rendering). for (unsigned field = 0; field < num_fields; ++field) { - // Put the actual texture upload in a lambda that is executed in the main thread. - // It is entirely possible to do this in the same thread (and it might even be - // faster, depending on the GPU and driver), but it appears to be trickling - // driver bugs very easily. - // - // Note that this means we must hold on to the actual frame data in - // until the upload command is run, but we hold on to much longer than that - // (in fact, all the way until we no longer use the texture in rendering). - auto upload_func = [this, field, video_format, y_offset, video_offset, cbcr_offset, cbcr_width, cbcr_height, interlaced_stride, userdata]() { - unsigned field_start_line; - if (field == 1) { - field_start_line = video_format.second_field_start; - } else { - field_start_line = video_format.extra_lines_top; - } - - // For anything not FRAME_FORMAT_YCBCR_10BIT, v210_width will be nonsensical but not used. - size_t v210_width = video_format.stride / sizeof(uint32_t); - ensure_texture_resolution(userdata, field, video_format.width, video_format.height, cbcr_width, cbcr_height, v210_width); - - glBindBuffer(GL_PIXEL_UNPACK_BUFFER, userdata->pbo); - check_error(); - - switch (userdata->pixel_format) { - case PixelFormat_10BitYCbCr: { - size_t field_start = video_offset + video_format.stride * field_start_line; - upload_texture(userdata->tex_v210[field], v210_width, video_format.height, video_format.stride, interlaced_stride, GL_RGBA, GL_UNSIGNED_INT_2_10_10_10_REV, field_start); - v210_converter->convert(userdata->tex_v210[field], userdata->tex_444[field], video_format.width, video_format.height); - break; - } - case PixelFormat_8BitYCbCr: { - size_t field_y_start = y_offset + video_format.width * field_start_line; - size_t field_cbcr_start = cbcr_offset + cbcr_width * field_start_line * sizeof(uint16_t); - - // Make up our own strides, since we are interleaving. - upload_texture(userdata->tex_y[field], video_format.width, video_format.height, video_format.width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_y_start); - upload_texture(userdata->tex_cbcr[field], cbcr_width, cbcr_height, cbcr_width * sizeof(uint16_t), interlaced_stride, GL_RG, GL_UNSIGNED_BYTE, field_cbcr_start); - break; - } - case PixelFormat_8BitYCbCrPlanar: { - assert(field_start_line == 0); // We don't really support interlaced here. - size_t field_y_start = y_offset; - size_t field_cb_start = cbcr_offset; - size_t field_cr_start = cbcr_offset + cbcr_width * cbcr_height; - - // Make up our own strides, since we are interleaving. - upload_texture(userdata->tex_y[field], video_format.width, video_format.height, video_format.width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_y_start); - upload_texture(userdata->tex_cb[field], cbcr_width, cbcr_height, cbcr_width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_cb_start); - upload_texture(userdata->tex_cr[field], cbcr_width, cbcr_height, cbcr_width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_cr_start); - break; - } - case PixelFormat_8BitBGRA: { - size_t field_start = video_offset + video_format.stride * field_start_line; - upload_texture(userdata->tex_rgba[field], video_format.width, video_format.height, video_format.stride, interlaced_stride, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, field_start); - // These could be asked to deliver mipmaps at any time. - glBindTexture(GL_TEXTURE_2D, userdata->tex_rgba[field]); - check_error(); - glGenerateMipmap(GL_TEXTURE_2D); - check_error(); - glBindTexture(GL_TEXTURE_2D, 0); - check_error(); - break; - } - default: - assert(false); - } - - glBindBuffer(GL_PIXEL_UNPACK_BUFFER, 0); - check_error(); - }; - if (field == 1) { // Don't upload the second field as fast as we can; wait until // the field time has approximately passed. (Otherwise, we could @@ -949,18 +1124,23 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, } { - unique_lock lock(card_mutex); + lock_guard lock(card_mutex); CaptureCard::NewFrame new_frame; new_frame.frame = frame; new_frame.length = frame_length; new_frame.field = field; new_frame.interlaced = video_format.interlaced; - new_frame.upload_func = upload_func; new_frame.dropped_frames = dropped_frames; new_frame.received_timestamp = video_frame.received_timestamp; // Ignore the audio timestamp. new_frame.video_format = video_format; + new_frame.video_offset = video_offset; new_frame.y_offset = y_offset; new_frame.cbcr_offset = cbcr_offset; + new_frame.texture_uploaded = false; + if (card->type == CardType::FFMPEG_INPUT) { + FFmpegCapture *ffmpeg_capture = static_cast(card->capture.get()); + new_frame.neutral_color = ffmpeg_capture->get_last_neutral_color(); + } card->new_frames.push_back(move(new_frame)); card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames); card->may_have_dropped_last_frame = false; @@ -969,6 +1149,87 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, } } +void Mixer::upload_texture_for_frame( + int field, bmusb::VideoFormat video_format, + size_t y_offset, size_t cbcr_offset, size_t video_offset, PBOFrameAllocator::Userdata *userdata) +{ + size_t cbcr_width, cbcr_height; + if (userdata != nullptr && userdata->pixel_format == PixelFormat_8BitYCbCrPlanar) { + cbcr_width = video_format.width / userdata->ycbcr_format.chroma_subsampling_x; + cbcr_height = video_format.height / userdata->ycbcr_format.chroma_subsampling_y; + } else { + // All the other Y'CbCr formats are 4:2:2. + cbcr_width = video_format.width / 2; + cbcr_height = video_format.height; + } + + bool interlaced_stride = video_format.interlaced && (video_format.second_field_start == 1); + if (video_format.interlaced) { + cbcr_height /= 2; + } + + unsigned field_start_line; + if (field == 1) { + field_start_line = video_format.second_field_start; + } else { + field_start_line = video_format.extra_lines_top; + } + + // For anything not FRAME_FORMAT_YCBCR_10BIT, v210_width will be nonsensical but not used. + size_t v210_width = video_format.stride / sizeof(uint32_t); + ensure_texture_resolution(userdata, field, video_format.width, video_format.height, cbcr_width, cbcr_height, v210_width); + + glBindBuffer(GL_PIXEL_UNPACK_BUFFER, userdata->pbo); + check_error(); + + switch (userdata->pixel_format) { + case PixelFormat_10BitYCbCr: { + size_t field_start = video_offset + video_format.stride * field_start_line; + upload_texture(userdata->tex_v210[field], v210_width, video_format.height, video_format.stride, interlaced_stride, GL_RGBA, GL_UNSIGNED_INT_2_10_10_10_REV, field_start); + v210_converter->convert(userdata->tex_v210[field], userdata->tex_444[field], video_format.width, video_format.height); + break; + } + case PixelFormat_8BitYCbCr: { + size_t field_y_start = y_offset + video_format.width * field_start_line; + size_t field_cbcr_start = cbcr_offset + cbcr_width * field_start_line * sizeof(uint16_t); + + // Make up our own strides, since we are interleaving. + upload_texture(userdata->tex_y[field], video_format.width, video_format.height, video_format.width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_y_start); + upload_texture(userdata->tex_cbcr[field], cbcr_width, cbcr_height, cbcr_width * sizeof(uint16_t), interlaced_stride, GL_RG, GL_UNSIGNED_BYTE, field_cbcr_start); + break; + } + case PixelFormat_8BitYCbCrPlanar: { + assert(field_start_line == 0); // We don't really support interlaced here. + size_t field_y_start = y_offset; + size_t field_cb_start = cbcr_offset; + size_t field_cr_start = cbcr_offset + cbcr_width * cbcr_height; + + // Make up our own strides, since we are interleaving. + upload_texture(userdata->tex_y[field], video_format.width, video_format.height, video_format.width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_y_start); + upload_texture(userdata->tex_cb[field], cbcr_width, cbcr_height, cbcr_width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_cb_start); + upload_texture(userdata->tex_cr[field], cbcr_width, cbcr_height, cbcr_width, interlaced_stride, GL_RED, GL_UNSIGNED_BYTE, field_cr_start); + break; + } + case PixelFormat_8BitBGRA: { + size_t field_start = video_offset + video_format.stride * field_start_line; + upload_texture(userdata->tex_rgba[field], video_format.width, video_format.height, video_format.stride, interlaced_stride, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, field_start); + // These could be asked to deliver mipmaps at any time. + glBindTexture(GL_TEXTURE_2D, userdata->tex_rgba[field]); + check_error(); + glGenerateMipmap(GL_TEXTURE_2D); + check_error(); + glBindTexture(GL_TEXTURE_2D, 0); + check_error(); + break; + } + default: + assert(false); + } + + glBindBuffer(GL_PIXEL_UNPACK_BUFFER, 0); + check_error(); +} + void Mixer::bm_hotplug_add(libusb_device *dev) { lock_guard lock(hotplug_mutex); @@ -988,13 +1249,13 @@ void Mixer::thread_func() QOpenGLContext *context = create_context(mixer_surface); if (!make_current(context, mixer_surface)) { printf("oops\n"); - exit(1); + abort(); } // Start the actual capture. (We don't want to do it before we're actually ready // to process output frames.) - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { - if (int(card_index) != output_card_index) { + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { + if (int(card_index) != output_card_index && cards[card_index].capture != nullptr) { cards[card_index].capture->start_bm_capture(); } } @@ -1024,18 +1285,18 @@ void Mixer::thread_func() master_card_index = output_card_index; } else { master_card_is_output = false; - master_card_index = theme->map_signal(master_clock_channel); - assert(master_card_index < num_cards + num_video_inputs); + master_card_index = theme->map_signal_to_card(master_clock_channel); + assert(master_card_index < MAX_VIDEO_CARDS); } - OutputFrameInfo output_frame_info = get_one_frame_from_each_card(master_card_index, master_card_is_output, new_frames, has_new_frame); + handle_hotplugged_cards(); + + vector raw_audio[MAX_VIDEO_CARDS]; // For MJPEG encoding. + OutputFrameInfo output_frame_info = get_one_frame_from_each_card(master_card_index, master_card_is_output, new_frames, has_new_frame, raw_audio); schedule_audio_resampling_tasks(output_frame_info.dropped_frames, output_frame_info.num_samples, output_frame_info.frame_duration, output_frame_info.is_preroll, output_frame_info.frame_timestamp); stats_dropped_frames += output_frame_info.dropped_frames; - handle_hotplugged_cards(); - - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { - DeviceSpec device = card_index_to_device(card_index, num_cards); + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { if (card_index == master_card_index || !has_new_frame[card_index]) { continue; } @@ -1044,7 +1305,7 @@ void Mixer::thread_func() } if (new_frames[card_index].dropped_frames > 0) { printf("%s dropped %d frames before this\n", - spec_to_string(device).c_str(), int(new_frames[card_index].dropped_frames)); + description_for_card(card_index).c_str(), int(new_frames[card_index].dropped_frames)); } } @@ -1056,7 +1317,7 @@ void Mixer::thread_func() continue; } - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { if (!has_new_frame[card_index] || new_frames[card_index].frame->len == 0) continue; @@ -1066,18 +1327,30 @@ void Mixer::thread_func() check_error(); // The new texture might need uploading before use. - if (new_frame->upload_func) { - new_frame->upload_func(); - new_frame->upload_func = nullptr; + if (!new_frame->texture_uploaded) { + upload_texture_for_frame(new_frame->field, new_frame->video_format, new_frame->y_offset, new_frame->cbcr_offset, + new_frame->video_offset, (PBOFrameAllocator::Userdata *)new_frame->frame->userdata); + new_frame->texture_uploaded = true; + } + + // Only set the white balance if it actually changed. This means that the user + // is free to override the white balance in a video with no white balance information + // actually set (ie. r=g=b=1 all the time), or one where the white point is wrong, + // but frame-to-frame decisions will be heeded. We do this pretty much as late + // as possible (ie., after picking out the frame from the buffer), so that we are sure + // that the change takes effect on exactly the right frame. + if (fabs(new_frame->neutral_color.r - last_received_neutral_color[card_index].r) > 1e-3 || + fabs(new_frame->neutral_color.g - last_received_neutral_color[card_index].g) > 1e-3 || + fabs(new_frame->neutral_color.b - last_received_neutral_color[card_index].b) > 1e-3) { + theme->set_wb_for_card(card_index, new_frame->neutral_color.r, new_frame->neutral_color.g, new_frame->neutral_color.b); + last_received_neutral_color[card_index] = new_frame->neutral_color; } - // There are situations where we could possibly want to - // include FFmpeg inputs (CEF inputs are unlikely), - // but they're not necessarily in 4:2:2 Y'CbCr, so it would - // require more functionality the the JPEG encoder. - if (card_index < num_cards) { - mjpeg_encoder->upload_frame(pts_int, card_index, new_frame->frame, new_frame->video_format, new_frame->y_offset, new_frame->cbcr_offset); + if (new_frame->frame->data_copy != nullptr && mjpeg_encoder->should_encode_mjpeg_for_card(card_index)) { + RGBTriplet neutral_color = theme->get_white_balance_for_card(card_index); + mjpeg_encoder->upload_frame(pts_int, card_index, new_frame->frame, new_frame->video_format, new_frame->y_offset, new_frame->cbcr_offset, move(raw_audio[card_index]), neutral_color); } + } int64_t frame_duration = output_frame_info.frame_duration; @@ -1173,11 +1446,11 @@ void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length) pair Mixer::get_channels_json() { Channels ret; - for (int channel_idx = 2; channel_idx < theme->get_num_channels(); ++channel_idx) { + for (int channel_idx = 0; channel_idx < theme->get_num_channels(); ++channel_idx) { Channel *channel = ret.add_channel(); - channel->set_index(channel_idx); - channel->set_name(theme->get_channel_name(channel_idx)); - channel->set_color(theme->get_channel_color(channel_idx)); + channel->set_index(channel_idx + 2); + channel->set_name(theme->get_channel_name(channel_idx + 2)); + channel->set_color(theme->get_channel_color(channel_idx + 2)); } string contents; google::protobuf::util::MessageToJsonString(ret, &contents); // Ignore any errors. @@ -1189,7 +1462,7 @@ pair Mixer::get_channel_color_http(unsigned channel_idx) return make_pair(theme->get_channel_color(channel_idx), "text/plain"); } -Mixer::OutputFrameInfo Mixer::get_one_frame_from_each_card(unsigned master_card_index, bool master_card_is_output, CaptureCard::NewFrame new_frames[MAX_VIDEO_CARDS], bool has_new_frame[MAX_VIDEO_CARDS]) +Mixer::OutputFrameInfo Mixer::get_one_frame_from_each_card(unsigned master_card_index, bool master_card_is_output, CaptureCard::NewFrame new_frames[MAX_VIDEO_CARDS], bool has_new_frame[MAX_VIDEO_CARDS], vector raw_audio[MAX_VIDEO_CARDS]) { OutputFrameInfo output_frame_info; start: @@ -1217,7 +1490,7 @@ start: goto start; } - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { CaptureCard *card = &cards[card_index]; if (card->new_frames.empty()) { // Starvation. ++card->metric_input_duped_frames; @@ -1228,7 +1501,7 @@ start: // we dropped. (may_have_dropped_last_frame is set whenever we // trim the queue completely away, and cleared when we actually // get a new frame.) - ((CEFCapture *)card->capture.get())->request_new_frame(); + ((CEFCapture *)card->capture.get())->request_new_frame(/*ignore_if_locked=*/true); } #endif } else { @@ -1237,6 +1510,8 @@ start: card->new_frames.pop_front(); card->new_frames_changed.notify_all(); } + + raw_audio[card_index] = move(card->new_raw_audio); } if (!master_card_is_output) { @@ -1249,7 +1524,7 @@ start: output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames); } - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs + num_html_inputs; ++card_index) { + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { CaptureCard *card = &cards[card_index]; if (has_new_frame[card_index] && !input_card_is_master_clock(card_index, master_card_index) && @@ -1268,7 +1543,7 @@ start: // This might get off by a fractional sample when changing master card // between ones with different frame rates, but that's fine. - int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples; + int64_t num_samples_times_timebase = int64_t(OUTPUT_FREQUENCY) * output_frame_info.frame_duration + fractional_samples; output_frame_info.num_samples = num_samples_times_timebase / TIMEBASE; fractional_samples = num_samples_times_timebase % TIMEBASE; assert(output_frame_info.num_samples >= 0); @@ -1279,34 +1554,51 @@ start: void Mixer::handle_hotplugged_cards() { // Check for cards that have been disconnected since last frame. - for (unsigned card_index = 0; card_index < num_cards; ++card_index) { + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { CaptureCard *card = &cards[card_index]; - if (card->capture->get_disconnected()) { + if (card->capture != nullptr && card->capture->get_disconnected()) { fprintf(stderr, "Card %u went away, replacing with a fake card.\n", card_index); FakeCapture *capture = new FakeCapture(global_flags.width, global_flags.height, FAKE_FPS, OUTPUT_FREQUENCY, card_index, global_flags.fake_cards_audio); - configure_card(card_index, capture, CardType::FAKE_CAPTURE, /*output=*/nullptr); + configure_card(card_index, capture, CardType::FAKE_CAPTURE, /*output=*/nullptr, /*is_srt_card=*/false); card->queue_length_policy.reset(card_index); card->capture->start_bm_capture(); } } + // Count how many active cards we already have. Used below to check that we + // don't go past the max_cards limit set by the user. Note that (non-SRT) video + // and HTML “cards” don't count towards this limit. + int num_video_cards = 0; + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { + CaptureCard *card = &cards[card_index]; + if (card->type == CardType::LIVE_CARD || is_srt_card(card)) { + ++num_video_cards; + } + } + // Check for cards that have been connected since last frame. vector hotplugged_cards_copy; +#ifdef HAVE_SRT + vector hotplugged_srt_cards_copy; +#endif { lock_guard lock(hotplug_mutex); swap(hotplugged_cards, hotplugged_cards_copy); +#ifdef HAVE_SRT + swap(hotplugged_srt_cards, hotplugged_srt_cards_copy); +#endif } for (libusb_device *new_dev : hotplugged_cards_copy) { // Look for a fake capture card where we can stick this in. int free_card_index = -1; - for (unsigned card_index = 0; card_index < num_cards; ++card_index) { + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { if (cards[card_index].is_fake_capture) { free_card_index = card_index; break; } } - if (free_card_index == -1) { + if (free_card_index == -1 || num_video_cards >= global_flags.max_num_cards) { fprintf(stderr, "New card plugged in, but no free slots -- ignoring.\n"); libusb_unref_device(new_dev); } else { @@ -1314,19 +1606,86 @@ void Mixer::handle_hotplugged_cards() fprintf(stderr, "New card plugged in, choosing slot %d.\n", free_card_index); CaptureCard *card = &cards[free_card_index]; BMUSBCapture *capture = new BMUSBCapture(free_card_index, new_dev); - configure_card(free_card_index, capture, CardType::LIVE_CARD, /*output=*/nullptr); + configure_card(free_card_index, capture, CardType::LIVE_CARD, /*output=*/nullptr, /*is_srt_card=*/false); card->queue_length_policy.reset(free_card_index); capture->set_card_disconnected_callback(bind(&Mixer::bm_hotplug_remove, this, free_card_index)); capture->start_bm_capture(); } } + +#ifdef HAVE_SRT + // Same, for SRT inputs. + for (SRTSOCKET sock : hotplugged_srt_cards_copy) { + char name[256]; + int namelen = sizeof(name); + srt_getsockopt(sock, /*ignored=*/0, SRTO_STREAMID, name, &namelen); + string stream_id(name, namelen); + + // Look for a fake capture card where we can stick this in. + // Prioritize ones that previously held SRT streams with the + // same stream ID, if any exist -- and it multiple exist, + // take the one that disconnected the last. + int first_free_card_index = -1, last_matching_free_card_index = -1; + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { + CaptureCard *card = &cards[card_index]; + if (!card->is_fake_capture) { + continue; + } + if (first_free_card_index == -1) { + first_free_card_index = card_index; + } + if (card->last_srt_stream_id == stream_id && + (last_matching_free_card_index == -1 || + card->fake_capture_counter > + cards[last_matching_free_card_index].fake_capture_counter)) { + last_matching_free_card_index = card_index; + } + } + + const int free_card_index = (last_matching_free_card_index != -1) + ? last_matching_free_card_index : first_free_card_index; + if (free_card_index == -1 || num_video_cards >= global_flags.max_num_cards) { + if (stream_id.empty()) { + stream_id = "no name"; + } + fprintf(stderr, "New SRT stream connected (%s), but no free slots -- ignoring.\n", stream_id.c_str()); + srt_close(sock); + } else { + // FFmpegCapture takes ownership. + if (stream_id.empty()) { + fprintf(stderr, "New unnamed SRT stream connected, choosing slot %d.\n", free_card_index); + } else { + fprintf(stderr, "New SRT stream connected (%s), choosing slot %d.\n", stream_id.c_str(), free_card_index); + } + CaptureCard *card = &cards[free_card_index]; + FFmpegCapture *capture = new FFmpegCapture(sock, stream_id); + capture->set_card_index(free_card_index); + configure_card(free_card_index, capture, CardType::FFMPEG_INPUT, /*output=*/nullptr, /*is_srt_card=*/true); + update_srt_stats(sock, card); // Initial zero stats. + card->last_srt_stream_id = stream_id; + card->queue_length_policy.reset(free_card_index); + capture->set_card_disconnected_callback(bind(&Mixer::bm_hotplug_remove, this, free_card_index)); + capture->start_bm_capture(); + } + } +#endif + + // Finally, newly forced-to-active fake capture cards. + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { + CaptureCard *card = &cards[card_index]; + if (card->capture == nullptr && card->force_active) { + FakeCapture *capture = new FakeCapture(global_flags.width, global_flags.height, FAKE_FPS, OUTPUT_FREQUENCY, card_index, global_flags.fake_cards_audio); + configure_card(card_index, capture, CardType::FAKE_CAPTURE, /*output=*/nullptr, /*is_srt_card=*/false); + card->queue_length_policy.reset(card_index); + card->capture->start_bm_capture(); + } + } } void Mixer::schedule_audio_resampling_tasks(unsigned dropped_frames, int num_samples_per_frame, int length_per_frame, bool is_preroll, steady_clock::time_point frame_timestamp) { // Resample the audio as needed, including from previously dropped frames. - assert(num_cards > 0); for (unsigned frame_num = 0; frame_num < dropped_frames + 1; ++frame_num) { const bool dropped_frame = (frame_num != dropped_frames); { @@ -1343,7 +1702,7 @@ void Mixer::schedule_audio_resampling_tasks(unsigned dropped_frames, int num_sam // non-dropped frame; perhaps we should just discard that as well, // since dropped frames are expected to be rare, and it might be // better to just wait until we have a slightly more normal situation). - unique_lock lock(audio_mutex); + lock_guard lock(audio_mutex); bool adjust_rate = !dropped_frame && !is_preroll; audio_task_queue.push(AudioTask{pts_int, num_samples_per_frame, adjust_rate, frame_timestamp}); audio_task_queue_changed.notify_one(); @@ -1367,8 +1726,8 @@ void Mixer::render_one_frame(int64_t duration) // Update Y'CbCr settings for all cards. { - unique_lock lock(card_mutex); - for (unsigned card_index = 0; card_index < num_cards; ++card_index) { + lock_guard lock(card_mutex); + for (unsigned card_index = 0; card_index < MAX_VIDEO_CARDS; ++card_index) { YCbCrInterpretation *interpretation = &ycbcr_interpretation[card_index]; input_state.ycbcr_coefficients_auto[card_index] = interpretation->ycbcr_coefficients_auto; input_state.ycbcr_coefficients[card_index] = interpretation->ycbcr_coefficients; @@ -1559,6 +1918,23 @@ void Mixer::quit() audio_task_queue_changed.notify_one(); mixer_thread.join(); audio_thread.join(); +#ifdef HAVE_SRT + if (global_flags.srt_port >= 0) { + // There's seemingly no other reasonable way to wake up the thread + // (libsrt's epoll equivalent is busy-waiting). + int sock = srt_socket(AF_INET6, 0, 0); + if (sock != -1) { + sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_addr = IN6ADDR_LOOPBACK_INIT; + addr.sin6_port = htons(global_flags.srt_port); + srt_connect(sock, (sockaddr *)&addr, sizeof(addr)); + srt_close(sock); + } + srt_thread.join(); + } +#endif } void Mixer::transition_clicked(int transition_num) @@ -1573,19 +1949,23 @@ void Mixer::channel_clicked(int preview_num) YCbCrInterpretation Mixer::get_input_ycbcr_interpretation(unsigned card_index) const { - unique_lock lock(card_mutex); + lock_guard lock(card_mutex); return ycbcr_interpretation[card_index]; } void Mixer::set_input_ycbcr_interpretation(unsigned card_index, const YCbCrInterpretation &interpretation) { - unique_lock lock(card_mutex); + lock_guard lock(card_mutex); ycbcr_interpretation[card_index] = interpretation; } void Mixer::start_mode_scanning(unsigned card_index) { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); + if (cards[card_index].capture != nullptr) { + // Inactive card. Should never happen. + return; + } if (is_mode_scanning[card_index]) { return; } @@ -1603,18 +1983,20 @@ void Mixer::start_mode_scanning(unsigned card_index) map Mixer::get_available_output_video_modes() const { assert(desired_output_card_index != -1); - unique_lock lock(card_mutex); + lock_guard lock(card_mutex); return cards[desired_output_card_index].output->get_available_video_modes(); } string Mixer::get_ffmpeg_filename(unsigned card_index) const { - assert(card_index >= num_cards && card_index < num_cards + num_video_inputs); + assert(card_index < MAX_VIDEO_CARDS); + assert(cards[card_index].type == CardType::FFMPEG_INPUT); return ((FFmpegCapture *)(cards[card_index].capture.get()))->get_filename(); } void Mixer::set_ffmpeg_filename(unsigned card_index, const string &filename) { - assert(card_index >= num_cards && card_index < num_cards + num_video_inputs); + assert(card_index < MAX_VIDEO_CARDS); + assert(cards[card_index].type == CardType::FFMPEG_INPUT); ((FFmpegCapture *)(cards[card_index].capture.get()))->change_filename(filename); } @@ -1641,7 +2023,7 @@ void Mixer::OutputChannel::output_frame(DisplayFrame &&frame) // Store this frame for display. Remove the ready frame if any // (it was seemingly never used). { - unique_lock lock(frame_mutex); + lock_guard lock(frame_mutex); if (has_ready_frame) { parent->release_display_frame(&ready_frame); } @@ -1696,7 +2078,7 @@ void Mixer::OutputChannel::output_frame(DisplayFrame &&frame) bool Mixer::OutputChannel::get_display_frame(DisplayFrame *frame) { - unique_lock lock(frame_mutex); + lock_guard lock(frame_mutex); if (!has_current_frame && !has_ready_frame) { return false; } @@ -1721,13 +2103,13 @@ bool Mixer::OutputChannel::get_display_frame(DisplayFrame *frame) void Mixer::OutputChannel::add_frame_ready_callback(void *key, Mixer::new_frame_ready_callback_t callback) { - unique_lock lock(frame_mutex); + lock_guard lock(frame_mutex); new_frame_ready_callbacks[key] = callback; } void Mixer::OutputChannel::remove_frame_ready_callback(void *key) { - unique_lock lock(frame_mutex); + lock_guard lock(frame_mutex); new_frame_ready_callbacks.erase(key); } @@ -1746,4 +2128,142 @@ void Mixer::OutputChannel::set_color_updated_callback(Mixer::color_updated_callb color_updated_callback = callback; } +#ifdef HAVE_SRT +void Mixer::start_srt() +{ + SRTSOCKET sock = srt_socket(AF_INET6, 0, 0); + sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_port = htons(global_flags.srt_port); + + int err = srt_bind(sock, (sockaddr *)&addr, sizeof(addr)); + if (err != 0) { + fprintf(stderr, "srt_bind: %s\n", srt_getlasterror_str()); + abort(); + } + err = srt_listen(sock, MAX_VIDEO_CARDS); + if (err != 0) { + fprintf(stderr, "srt_listen: %s\n", srt_getlasterror_str()); + abort(); + } + + srt_thread = thread([this, sock] { + sockaddr_in6 addr; + for ( ;; ) { + int sa_len = sizeof(addr); + int clientsock = srt_accept(sock, (sockaddr *)&addr, &sa_len); + if (should_quit) { + if (clientsock != -1) { + srt_close(clientsock); + } + break; + } + if (!global_flags.enable_srt) { // Runtime UI toggle. + // Perhaps not as good as never listening in the first place, + // but much simpler to turn on and off. + srt_close(clientsock); + continue; + } + lock_guard lock(hotplug_mutex); + hotplugged_srt_cards.push_back(clientsock); + } + srt_close(sock); + }); +} +#endif + +#ifdef HAVE_SRT +void Mixer::update_srt_stats(int srt_sock, Mixer::CaptureCard *card) +{ + SRT_TRACEBSTATS stats; + srt_bistats(srt_sock, &stats, /*clear=*/0, /*instantaneous=*/1); + + card->metric_srt_uptime_seconds = stats.msTimeStamp * 1e-3; + card->metric_srt_send_duration_seconds = stats.usSndDurationTotal * 1e-6; + card->metric_srt_sent_bytes = stats.byteSentTotal; + card->metric_srt_received_bytes = stats.byteRecvTotal; + card->metric_srt_sent_packets_normal = stats.pktSentTotal; + card->metric_srt_received_packets_normal = stats.pktRecvTotal; + card->metric_srt_sent_packets_lost = stats.pktSndLossTotal; + card->metric_srt_received_packets_lost = stats.pktRcvLossTotal; + card->metric_srt_sent_packets_retransmitted = stats.pktRetransTotal; + card->metric_srt_sent_bytes_retransmitted = stats.byteRetransTotal; + card->metric_srt_sent_packets_ack = stats.pktSentACKTotal; + card->metric_srt_received_packets_ack = stats.pktRecvACKTotal; + card->metric_srt_sent_packets_nak = stats.pktSentNAKTotal; + card->metric_srt_received_packets_nak = stats.pktRecvNAKTotal; + card->metric_srt_sent_packets_dropped = stats.pktSndDropTotal; + card->metric_srt_received_packets_dropped = stats.pktRcvDropTotal; + card->metric_srt_sent_bytes_dropped = stats.byteSndDropTotal; + card->metric_srt_received_bytes_dropped = stats.byteRcvDropTotal; + card->metric_srt_received_packets_undecryptable = stats.pktRcvUndecryptTotal; + card->metric_srt_received_bytes_undecryptable = stats.byteRcvUndecryptTotal; + card->metric_srt_filter_sent_packets = stats.pktSndFilterExtraTotal; + card->metric_srt_filter_received_extra_packets = stats.pktRcvFilterExtraTotal; + card->metric_srt_filter_received_rebuilt_packets = stats.pktRcvFilterSupplyTotal; + card->metric_srt_filter_received_lost_packets = stats.pktRcvFilterLossTotal; + + // Gauges. + card->metric_srt_packet_sending_period_seconds = stats.usPktSndPeriod * 1e-6; + card->metric_srt_flow_window_packets = stats.pktFlowWindow; + card->metric_srt_congestion_window_packets = stats.pktCongestionWindow; + card->metric_srt_flight_size_packets = stats.pktFlightSize; + card->metric_srt_rtt_seconds = stats.msRTT * 1e-3; + card->metric_srt_estimated_bandwidth_bits_per_second = stats.mbpsBandwidth * 1e6; + card->metric_srt_bandwidth_ceiling_bits_per_second = stats.mbpsMaxBW * 1e6; + card->metric_srt_send_buffer_available_bytes = stats.byteAvailSndBuf; + card->metric_srt_receive_buffer_available_bytes = stats.byteAvailRcvBuf; + card->metric_srt_mss_bytes = stats.byteMSS; + card->metric_srt_sender_unacked_packets = stats.pktSndBuf; + card->metric_srt_sender_unacked_bytes = stats.byteSndBuf; + card->metric_srt_sender_unacked_timespan_seconds = stats.msSndBuf * 1e-3; + card->metric_srt_sender_delivery_delay_seconds = stats.msSndTsbPdDelay * 1e-3; + card->metric_srt_receiver_unacked_packets = stats.pktRcvBuf; + card->metric_srt_receiver_unacked_bytes = stats.byteRcvBuf; + card->metric_srt_receiver_unacked_timespan_seconds = stats.msRcvBuf * 1e-3; + card->metric_srt_receiver_delivery_delay_seconds = stats.msRcvTsbPdDelay * 1e-3; +} +#endif + +string Mixer::description_for_card(unsigned card_index) +{ + CaptureCard *card = &cards[card_index]; + if (card->capture == nullptr) { + // Should never be called for inactive cards, but OK. + char buf[256]; + snprintf(buf, sizeof(buf), "Inactive capture card %u", card_index); + return buf; + } + if (card->type != CardType::FFMPEG_INPUT) { + char buf[256]; + snprintf(buf, sizeof(buf), "Capture card %u (%s)", card_index, card->capture->get_description().c_str()); + return buf; + } + + // Number (non-SRT) FFmpeg inputs from zero, separately from the capture cards, + // since it's not too obvious for the user that they are “cards”. + unsigned ffmpeg_index = 0; + for (unsigned i = 0; i < card_index; ++i) { + CaptureCard *other_card = &cards[i]; + if (other_card->type == CardType::FFMPEG_INPUT && !is_srt_card(other_card)) { + ++ffmpeg_index; + } + } + char buf[256]; + snprintf(buf, sizeof(buf), "Video input %u (%s)", ffmpeg_index, card->capture->get_description().c_str()); + return buf; +} + +bool Mixer::is_srt_card(const Mixer::CaptureCard *card) +{ +#ifdef HAVE_SRT + if (card->type == CardType::FFMPEG_INPUT) { + int srt_sock = static_cast(card->capture.get())->get_srt_sock(); + return srt_sock != -1; + } +#endif + return false; +} + mutex RefCountedGLsync::fence_lock;