X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=nageru%2Fmixer.h;h=6e9da90c3c0e1a27bc830a18acd1407cf6cfc1f0;hb=c4497a099d6a01ff2eedb483c2589ef70e7c657c;hp=69b28f58826bb7c4603107d3f7efa668b97e196c;hpb=f8da8feaff269b75480625e1384951c20c3a529d;p=nageru diff --git a/nageru/mixer.h b/nageru/mixer.h index 69b28f5..6e9da90 100644 --- a/nageru/mixer.h +++ b/nageru/mixer.h @@ -23,15 +23,18 @@ #include #include +#include #include #include "audio_mixer.h" #include "bmusb/bmusb.h" #include "defs.h" +#include "ffmpeg_capture.h" #include "shared/httpd.h" #include "input_state.h" #include "libusb.h" #include "pbo_frame_allocator.h" +#include "queue_length_policy.h" #include "ref_counted_frame.h" #include "shared/ref_counted_gl_sync.h" #include "theme.h" @@ -55,109 +58,10 @@ class ResourcePool; class YCbCrInput; } // namespace movit -// A class to estimate the future jitter. Used in QueueLengthPolicy (see below). -// -// There are many ways to estimate jitter; I've tested a few ones (and also -// some algorithms that don't explicitly model jitter) with different -// parameters on some real-life data in experiments/queue_drop_policy.cpp. -// This is one based on simple order statistics where I've added some margin in -// the number of starvation events; I believe that about one every hour would -// probably be acceptable, but this one typically goes lower than that, at the -// cost of 2–3 ms extra latency. (If the queue is hard-limited to one frame, it's -// possible to get ~10 ms further down, but this would mean framedrops every -// second or so.) The general strategy is: Take the 99.9-percentile jitter over -// last 5000 frames, multiply by two, and that's our worst-case jitter -// estimate. The fact that we're not using the max value means that we could -// actually even throw away very late frames immediately, which means we only -// get one user-visible event instead of seeing something both when the frame -// arrives late (duplicate frame) and then again when we drop. -class JitterHistory { -private: - static constexpr size_t history_length = 5000; - static constexpr double percentile = 0.999; - static constexpr double multiplier = 2.0; - -public: - void register_metrics(const std::vector> &labels); - void unregister_metrics(const std::vector> &labels); - - void clear() { - history.clear(); - orders.clear(); - } - void frame_arrived(std::chrono::steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames); - std::chrono::steady_clock::time_point get_expected_next_frame() const { return expected_timestamp; } - double estimate_max_jitter() const; - -private: - // A simple O(k) based algorithm for getting the k-th largest or - // smallest element from our window; we simply keep the multiset - // ordered (insertions and deletions are O(n) as always) and then - // iterate from one of the sides. If we had larger values of k, - // we could go for a more complicated setup with two sets or heaps - // (one increasing and one decreasing) that we keep balanced around - // the point, or it is possible to reimplement std::set with - // counts in each node. However, since k=5, we don't need this. - std::multiset orders; - std::deque::iterator> history; - - std::chrono::steady_clock::time_point expected_timestamp = std::chrono::steady_clock::time_point::min(); - - // Metrics. There are no direct summaries for jitter, since we already have latency summaries. - std::atomic metric_input_underestimated_jitter_frames{0}; - std::atomic metric_input_estimated_max_jitter_seconds{0.0 / 0.0}; -}; - -// For any card that's not the master (where we pick out the frames as they -// come, as fast as we can process), there's going to be a queue. The question -// is when we should drop frames from that queue (apart from the obvious -// dropping if the 16-frame queue should become full), especially given that -// the frame rate could be lower or higher than the master (either subtly or -// dramatically). We have two (conflicting) demands: -// -// 1. We want to avoid starving the queue. -// 2. We don't want to add more delay than is needed. -// -// Our general strategy is to drop as many frames as we can (helping for #2) -// that we think is safe for #1 given jitter. To this end, we measure the -// deviation from the expected arrival time for all cards, and use that for -// continuous jitter estimation. -// -// We then drop everything from the queue that we're sure we won't need to -// serve the output in the time before the next frame arrives. Typically, -// this means the queue will contain 0 or 1 frames, although more is also -// possible if the jitter is very high. -class QueueLengthPolicy { -public: - QueueLengthPolicy() {} - void reset(unsigned card_index) { - this->card_index = card_index; - } - - void register_metrics(const std::vector> &labels); - void unregister_metrics(const std::vector> &labels); - - // Call after picking out a frame, so 0 means starvation. - void update_policy(std::chrono::steady_clock::time_point now, - std::chrono::steady_clock::time_point expected_next_frame, - int64_t input_frame_duration, - int64_t master_frame_duration, - double max_input_card_jitter_seconds, - double max_master_card_jitter_seconds); - unsigned get_safe_queue_length() const { return safe_queue_length; } - -private: - unsigned card_index; // For debugging and metrics only. - unsigned safe_queue_length = 0; // Can never go below zero. - - // Metrics. - std::atomic metric_input_queue_safe_length_frames{1}; -}; - class Mixer { public: // The surface format is used for offscreen destinations for OpenGL contexts we need. - Mixer(const QSurfaceFormat &format, unsigned num_cards); + Mixer(const QSurfaceFormat &format); ~Mixer(); void start(); void quit(); @@ -250,14 +154,14 @@ public: return theme->get_channel_color(channel); } - int get_channel_signal(unsigned channel) const + int map_channel_to_signal(unsigned channel) const { - return theme->get_channel_signal(channel); + return theme->map_channel_to_signal(channel); } - int map_signal(unsigned channel) + int map_signal_to_card(int signal) { - return theme->map_signal(channel); + return theme->map_signal_to_card(signal); } unsigned get_master_clock() const @@ -288,6 +192,11 @@ public: theme->set_wb(channel, r, g, b); } + std::string format_status_line(const std::string &disk_space_left_text, double file_length_seconds) + { + return theme->format_status_line(disk_space_left_text, file_length_seconds); + } + // Note: You can also get this through the global variable global_audio_mixer. AudioMixer *get_audio_mixer() { return audio_mixer.get(); } const AudioMixer *get_audio_mixer() const { return audio_mixer.get(); } @@ -297,10 +206,8 @@ public: should_cut = true; } - unsigned get_num_cards() const { return num_cards; } - std::string get_card_description(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_description(); } @@ -310,7 +217,7 @@ public: // the card's actual name. std::string get_output_card_description(unsigned card_index) const { assert(card_can_be_used_as_output(card_index)); - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); if (cards[card_index].parked_capture) { return cards[card_index].parked_capture->get_description(); } else { @@ -319,59 +226,87 @@ public: } bool card_can_be_used_as_output(unsigned card_index) const { - assert(card_index < num_cards); - return cards[card_index].output != nullptr; + assert(card_index < MAX_VIDEO_CARDS); + return cards[card_index].output != nullptr && cards[card_index].capture != nullptr; + } + + bool card_is_cef(unsigned card_index) const { + assert(card_index < MAX_VIDEO_CARDS); + return cards[card_index].type == CardType::CEF_INPUT; } bool card_is_ffmpeg(unsigned card_index) const { - assert(card_index < num_cards + num_video_inputs); - return cards[card_index].type == CardType::FFMPEG_INPUT; + assert(card_index < MAX_VIDEO_CARDS); + if (cards[card_index].type != CardType::FFMPEG_INPUT) { + return false; + } +#ifdef HAVE_SRT + // SRT inputs are more like regular inputs than FFmpeg inputs, + // so show them as such. (This allows the user to right-click + // to select a different input.) + return static_cast(cards[card_index].capture.get())->get_srt_sock() == -1; +#else + return true; +#endif + } + + bool card_is_active(unsigned card_index) const { + assert(card_index < MAX_VIDEO_CARDS); + std::lock_guard lock(card_mutex); + return cards[card_index].capture != nullptr; + } + + void force_card_active(unsigned card_index) + { + // handle_hotplugged_cards() will pick this up. + std::lock_guard lock(card_mutex); + cards[card_index].force_active = true; } std::map get_available_video_modes(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_available_video_modes(); } uint32_t get_current_video_mode(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_current_video_mode(); } void set_video_mode(unsigned card_index, uint32_t mode) { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); cards[card_index].capture->set_video_mode(mode); } void start_mode_scanning(unsigned card_index); std::map get_available_video_inputs(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_available_video_inputs(); } uint32_t get_current_video_input(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_current_video_input(); } void set_video_input(unsigned card_index, uint32_t input) { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); cards[card_index].capture->set_video_input(input); } std::map get_available_audio_inputs(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_available_audio_inputs(); } uint32_t get_current_audio_input(unsigned card_index) const { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); return cards[card_index].capture->get_current_audio_input(); } void set_audio_input(unsigned card_index, uint32_t input) { - assert(card_index < num_cards); + assert(card_index < MAX_VIDEO_CARDS); cards[card_index].capture->set_audio_input(input); } @@ -427,17 +362,15 @@ public: private: struct CaptureCard; - enum class CardType { - LIVE_CARD, - FAKE_CAPTURE, - FFMPEG_INPUT, - CEF_INPUT, - }; - void configure_card(unsigned card_index, bmusb::CaptureInterface *capture, CardType card_type, DeckLinkOutput *output); + void configure_card(unsigned card_index, bmusb::CaptureInterface *capture, CardType card_type, DeckLinkOutput *output, bool is_srt_card); void set_output_card_internal(int card_index); // Should only be called from the mixer thread. void bm_frame(unsigned card_index, uint16_t timecode, bmusb::FrameAllocator::Frame video_frame, size_t video_offset, bmusb::VideoFormat video_format, bmusb::FrameAllocator::Frame audio_frame, size_t audio_offset, bmusb::AudioFormat audio_format); + void upload_texture_for_frame( + int field, bmusb::VideoFormat video_format, + size_t y_offset, size_t cbcr_offset, size_t video_offset, + PBOFrameAllocator::Userdata *userdata); void bm_hotplug_add(libusb_device *dev); void bm_hotplug_remove(unsigned card_index); void place_rectangle(movit::Effect *resample_effect, movit::Effect *padding_effect, float x0, float y0, float x1, float y1); @@ -448,13 +381,16 @@ private: void render_one_frame(int64_t duration); void audio_thread_func(); void release_display_frame(DisplayFrame *frame); +#ifdef HAVE_SRT + void start_srt(); +#endif double pts() { return double(pts_int) / TIMEBASE; } void trim_queue(CaptureCard *card, size_t safe_queue_length); std::pair get_channels_json(); std::pair get_channel_color_http(unsigned channel_idx); HTTPD httpd; - unsigned num_cards, num_video_inputs, num_html_inputs = 0; + unsigned num_video_inputs, num_html_inputs = 0; QSurface *mixer_surface, *h264_encoder_surface, *decklink_output_surface, *image_update_surface; std::unique_ptr resource_pool; @@ -495,11 +431,24 @@ private: // frame rate is integer, will always stay zero. unsigned fractional_samples = 0; + // Monotonic counter that lets us know which slot was last turned into + // a fake capture. Used for SRT re-plugging. + unsigned fake_capture_counter = 0; + mutable std::mutex card_mutex; bool has_bmusb_thread = false; struct CaptureCard { + // If nullptr, the card is inactive, and will be hidden in the UI. + // Only fake capture cards can be inactive. std::unique_ptr capture; + // If true, card must always be active (typically because it's one of the + // first cards, or because the theme has explicitly asked for it). + bool force_active = false; bool is_fake_capture; + // If is_fake_capture is true, contains a monotonic timer value for when + // it was last changed. Otherwise undefined. Used for SRT re-plugging. + int fake_capture_counter; + std::string last_srt_stream_id = ""; // Used for SRT re-plugging. CardType type; std::unique_ptr output; @@ -529,15 +478,15 @@ private: int64_t length; // In TIMEBASE units. bool interlaced; unsigned field; // Which field (0 or 1) of the frame to use. Always 0 for progressive. - std::function upload_func; // Needs to be called to actually upload the texture to OpenGL. + bool texture_uploaded = false; unsigned dropped_frames = 0; // Number of dropped frames before this one. std::chrono::steady_clock::time_point received_timestamp = std::chrono::steady_clock::time_point::min(); + movit::RGBTriplet neutral_color{1.0f, 1.0f, 1.0f}; - // Used for MJPEG encoding. (upload_func packs everything it needs - // into the functor, but would otherwise also use these.) + // Used for MJPEG encoding, and texture upload. // width=0 or height=0 means a broken frame, ie., do not upload. bmusb::VideoFormat video_format; - size_t y_offset, cbcr_offset; + size_t video_offset, y_offset, cbcr_offset; }; std::deque new_frames; std::condition_variable new_frames_changed; // Set whenever new_frames is changed. @@ -566,10 +515,58 @@ private: std::atomic metric_input_frame_rate_nom{-1}; std::atomic metric_input_frame_rate_den{-1}; std::atomic metric_input_sample_rate_hz{-1}; + + // SRT metrics. + std::atomic metric_srt_uptime_seconds{0.0 / 0.0}; + std::atomic metric_srt_send_duration_seconds{0.0 / 0.0}; + std::atomic metric_srt_sent_bytes{-1}; + std::atomic metric_srt_received_bytes{-1}; + std::atomic metric_srt_sent_packets_normal{-1}; + std::atomic metric_srt_received_packets_normal{-1}; + std::atomic metric_srt_sent_packets_lost{-1}; + std::atomic metric_srt_received_packets_lost{-1}; + std::atomic metric_srt_sent_packets_retransmitted{-1}; + std::atomic metric_srt_sent_bytes_retransmitted{-1}; + std::atomic metric_srt_sent_packets_ack{-1}; + std::atomic metric_srt_received_packets_ack{-1}; + std::atomic metric_srt_sent_packets_nak{-1}; + std::atomic metric_srt_received_packets_nak{-1}; + std::atomic metric_srt_sent_packets_dropped{-1}; + std::atomic metric_srt_received_packets_dropped{-1}; + std::atomic metric_srt_sent_bytes_dropped{-1}; + std::atomic metric_srt_received_bytes_dropped{-1}; + std::atomic metric_srt_received_packets_undecryptable{-1}; + std::atomic metric_srt_received_bytes_undecryptable{-1}; + + std::atomic metric_srt_filter_received_extra_packets{-1}; + std::atomic metric_srt_filter_received_rebuilt_packets{-1}; + std::atomic metric_srt_filter_received_lost_packets{-1}; + + std::atomic metric_srt_packet_sending_period_seconds{0.0 / 0.0}; + std::atomic metric_srt_flow_window_packets{-1}; + std::atomic metric_srt_congestion_window_packets{-1}; + std::atomic metric_srt_flight_size_packets{-1}; + std::atomic metric_srt_rtt_seconds{0.0 / 0.0}; + std::atomic metric_srt_estimated_bandwidth_bits_per_second{0.0 / 0.0}; + std::atomic metric_srt_bandwidth_ceiling_bits_per_second{0.0 / 0.0}; + std::atomic metric_srt_send_buffer_available_bytes{-1}; + std::atomic metric_srt_receive_buffer_available_bytes{-1}; + std::atomic metric_srt_mss_bytes{-1}; + std::atomic metric_srt_sender_unacked_packets{-1}; + std::atomic metric_srt_sender_unacked_bytes{-1}; + std::atomic metric_srt_sender_unacked_timespan_seconds{0.0 / 0.0}; + std::atomic metric_srt_sender_delivery_delay_seconds{0.0 / 0.0}; + std::atomic metric_srt_receiver_unacked_packets{-1}; + std::atomic metric_srt_receiver_unacked_bytes{-1}; + std::atomic metric_srt_receiver_unacked_timespan_seconds{0.0 / 0.0}; + std::atomic metric_srt_receiver_delivery_delay_seconds{0.0 / 0.0}; + std::atomic metric_srt_filter_sent_packets{-1}; + }; JitterHistory output_jitter_history; CaptureCard cards[MAX_VIDEO_CARDS]; // Protected by . YCbCrInterpretation ycbcr_interpretation[MAX_VIDEO_CARDS]; // Protected by . + movit::RGBTriplet last_received_neutral_color[MAX_VIDEO_CARDS]; // Used by the mixer thread only. Constructor-initialiezd. std::unique_ptr audio_mixer; // Same as global_audio_mixer (see audio_mixer.h). bool input_card_is_master_clock(unsigned card_index, unsigned master_card_index) const; struct OutputFrameInfo { @@ -581,12 +578,22 @@ private: }; OutputFrameInfo get_one_frame_from_each_card(unsigned master_card_index, bool master_card_is_output, CaptureCard::NewFrame new_frames[MAX_VIDEO_CARDS], bool has_new_frame[MAX_VIDEO_CARDS], std::vector raw_audio[MAX_VIDEO_CARDS]); +#ifdef HAVE_SRT + void update_srt_stats(int srt_sock, Mixer::CaptureCard *card); +#endif + + std::string description_for_card(unsigned card_index); + static bool is_srt_card(const CaptureCard *card); + InputState input_state; // Cards we have been noticed about being hotplugged, but haven't tried adding yet. // Protected by its own mutex. std::mutex hotplug_mutex; std::vector hotplugged_cards; +#ifdef HAVE_SRT + std::vector hotplugged_srt_cards; +#endif class OutputChannel { public: @@ -619,6 +626,9 @@ private: std::thread mixer_thread; std::thread audio_thread; +#ifdef HAVE_SRT + std::thread srt_thread; +#endif std::atomic should_quit{false}; std::atomic should_cut{false};