X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=mixer.cpp;h=f45880d321be6d2a1c17c5a61b7ba0babc28e846;hb=fa54f2630c56a1df0046923d6a77b1bd58abf240;hp=6ed62113ff4f775edfb3e110be77db8f7a136bf5;hpb=7ac4f511034b3a69be8cf1344fd8c2693eeeee4c;p=nageru diff --git a/mixer.cpp b/mixer.cpp index 6ed6211..f45880d 100644 --- a/mixer.cpp +++ b/mixer.cpp @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -31,6 +30,7 @@ #include "DeckLinkAPI.h" #include "LinuxCOM.h" #include "alsa_output.h" +#include "basic_stats.h" #include "bmusb/bmusb.h" #include "bmusb/fake_capture.h" #include "chroma_subsampler.h" @@ -61,7 +61,6 @@ using namespace std::placeholders; using namespace bmusb; Mixer *global_mixer = nullptr; -bool uses_mlock = false; namespace { @@ -198,39 +197,97 @@ void upload_texture(GLuint tex, GLuint width, GLuint height, GLuint stride, bool } // namespace -void QueueLengthPolicy::register_metrics(const vector> &labels) +void JitterHistory::register_metrics(const vector> &labels) { - global_metrics.add("input_queue_length_frames", labels, &metric_input_queue_length_frames, Metrics::TYPE_GAUGE); - global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE); - global_metrics.add("input_queue_duped_frames", labels, &metric_input_duped_frames); + global_metrics.add("input_underestimated_jitter_frames", labels, &metric_input_underestimated_jitter_frames); + global_metrics.add("input_estimated_max_jitter_seconds", labels, &metric_input_estimated_max_jitter_seconds, Metrics::TYPE_GAUGE); +} + +void JitterHistory::unregister_metrics(const vector> &labels) +{ + global_metrics.remove("input_underestimated_jitter_frames", labels); + global_metrics.remove("input_estimated_max_jitter_seconds", labels); } -void QueueLengthPolicy::update_policy(unsigned queue_length) +void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames) { - if (queue_length == 0) { // Starvation. - if (been_at_safe_point_since_last_starvation && safe_queue_length < unsigned(global_flags.max_input_queue_frames)) { - ++safe_queue_length; - fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n", - card_index, safe_queue_length); + if (expected_timestamp > steady_clock::time_point::min()) { + expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE); + double jitter_seconds = fabs(duration(expected_timestamp - now).count()); + history.push_back(orders.insert(jitter_seconds)); + if (jitter_seconds > estimate_max_jitter()) { + ++metric_input_underestimated_jitter_frames; } - frames_with_at_least_one = 0; - been_at_safe_point_since_last_starvation = false; - ++metric_input_duped_frames; - metric_input_queue_safe_length_frames = safe_queue_length; - metric_input_queue_length_frames = 0; - return; + + metric_input_estimated_max_jitter_seconds = estimate_max_jitter(); + + if (history.size() > history_length) { + orders.erase(history.front()); + history.pop_front(); + } + assert(history.size() <= history_length); } - if (queue_length >= safe_queue_length) { - been_at_safe_point_since_last_starvation = true; + expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE); +} + +double JitterHistory::estimate_max_jitter() const +{ + if (orders.empty()) { + return 0.0; } - if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) { - --safe_queue_length; - metric_input_queue_safe_length_frames = safe_queue_length; - fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n", - card_index, safe_queue_length); - frames_with_at_least_one = 0; + size_t elem_idx = lrint((orders.size() - 1) * percentile); + if (percentile <= 0.5) { + return *next(orders.begin(), elem_idx) * multiplier; + } else { + return *prev(orders.end(), elem_idx + 1) * multiplier; } - metric_input_queue_length_frames = min(queue_length, safe_queue_length); // The caller will drop frames for us if needed. +} + +void QueueLengthPolicy::register_metrics(const vector> &labels) +{ + global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE); +} + +void QueueLengthPolicy::unregister_metrics(const vector> &labels) +{ + global_metrics.remove("input_queue_safe_length_frames", labels); +} + +void QueueLengthPolicy::update_policy(steady_clock::time_point now, + steady_clock::time_point expected_next_frame, + int64_t input_frame_duration, + int64_t master_frame_duration, + double max_input_card_jitter_seconds, + double max_master_card_jitter_seconds) +{ + double input_frame_duration_seconds = input_frame_duration / double(TIMEBASE); + double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE); + + // Figure out when we can expect the next frame for this card, assuming + // worst-case jitter (ie., the frame is maximally late). + double seconds_until_next_frame = max(duration(expected_next_frame - now).count() + max_input_card_jitter_seconds, 0.0); + + // How many times are the master card expected to tick in that time? + // We assume the master clock has worst-case jitter but not any rate + // discrepancy, ie., it ticks as early as possible every time, but not + // cumulatively. + double frames_needed = (seconds_until_next_frame + max_master_card_jitter_seconds) / master_frame_duration_seconds; + + // As a special case, if the master card ticks faster than the input card, + // we expect the queue to drain by itself even without dropping. But if + // the difference is small (e.g. 60 Hz master and 59.94 input), it would + // go slowly enough that the effect wouldn't really be appreciable. + // We account for this by looking at the situation five frames ahead, + // assuming everything else is the same. + double frames_allowed; + if (master_frame_duration < input_frame_duration) { + frames_allowed = frames_needed + 5 * (input_frame_duration_seconds - master_frame_duration_seconds) / master_frame_duration_seconds; + } else { + frames_allowed = frames_needed; + } + + safe_queue_length = max(floor(frames_allowed), 0); + metric_input_queue_safe_length_frames = safe_queue_length; } Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) @@ -296,7 +353,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) theme.reset(new Theme(global_flags.theme_filename, global_flags.theme_dirs, resource_pool.get(), num_cards)); // Start listening for clients only once VideoEncoder has written its header, if any. - httpd.start(9095); + httpd.start(global_flags.http_port); // First try initializing the then PCI devices, then USB, then // fill up with fake cards until we have the desired number of cards. @@ -314,7 +371,10 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) DeckLinkCapture *capture = new DeckLinkCapture(decklink, card_index); DeckLinkOutput *output = new DeckLinkOutput(resource_pool.get(), decklink_output_surface, global_flags.width, global_flags.height, card_index); - output->set_device(decklink); + if (!output->set_device(decklink)) { + delete output; + output = nullptr; + } configure_card(card_index, capture, CardType::LIVE_CARD, output); ++num_pci_devices; } @@ -402,13 +462,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) set_output_card_internal(global_flags.output_card); } - metric_start_time_seconds = get_timestamp_for_metrics(); - - global_metrics.add("frames_output_total", &metric_frames_output_total); - global_metrics.add("frames_output_dropped", &metric_frames_output_dropped); - global_metrics.add("start_time_seconds", &metric_start_time_seconds, Metrics::TYPE_GAUGE); - global_metrics.add("memory_used_bytes", &metrics_memory_used_bytes); - global_metrics.add("memory_locked_limit_bytes", &metrics_memory_locked_limit_bytes); + output_jitter_history.register_metrics({{ "card", "output" }}); } Mixer::~Mixer() @@ -475,6 +529,28 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT audio_mixer.set_display_name(device, card->capture->get_description()); audio_mixer.trigger_state_changed_callback(); + // Unregister old metrics, if any. + if (!card->labels.empty()) { + const vector> &labels = card->labels; + card->jitter_history.unregister_metrics(labels); + card->queue_length_policy.unregister_metrics(labels); + global_metrics.remove("input_received_frames", labels); + global_metrics.remove("input_dropped_frames_jitter", labels); + global_metrics.remove("input_dropped_frames_error", labels); + global_metrics.remove("input_dropped_frames_resets", labels); + global_metrics.remove("input_queue_length_frames", labels); + global_metrics.remove("input_queue_duped_frames", labels); + + global_metrics.remove("input_has_signal_bool", labels); + global_metrics.remove("input_is_connected_bool", labels); + global_metrics.remove("input_interlaced_bool", labels); + global_metrics.remove("input_width_pixels", labels); + global_metrics.remove("input_height_pixels", labels); + global_metrics.remove("input_frame_rate_nom", labels); + global_metrics.remove("input_frame_rate_den", labels); + global_metrics.remove("input_sample_rate_hz", labels); + } + // Register metrics. vector> labels; char card_name[64]; @@ -494,11 +570,14 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT default: assert(false); } + card->jitter_history.register_metrics(labels); card->queue_length_policy.register_metrics(labels); global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames); global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter); global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error); global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets); + global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE); + global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames); global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE); global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE); @@ -508,6 +587,7 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT global_metrics.add("input_frame_rate_nom", labels, &card->metric_input_frame_rate_nom, Metrics::TYPE_GAUGE); global_metrics.add("input_frame_rate_den", labels, &card->metric_input_frame_rate_den, Metrics::TYPE_GAUGE); global_metrics.add("input_sample_rate_hz", labels, &card->metric_input_sample_rate_hz, Metrics::TYPE_GAUGE); + card->labels = labels; } void Mixer::set_output_card_internal(int card_index) @@ -528,7 +608,7 @@ void Mixer::set_output_card_internal(int card_index) lock.unlock(); fake_capture->stop_dequeue_thread(); lock.lock(); - old_card->capture = move(old_card->parked_capture); + old_card->capture = move(old_card->parked_capture); // TODO: reset the metrics old_card->is_fake_capture = false; old_card->capture->start_bm_capture(); } @@ -550,6 +630,7 @@ void Mixer::set_output_card_internal(int card_index) card->output->start_output(desired_output_video_mode, pts_int); } output_card_index = card_index; + output_jitter_history.clear(); } namespace { @@ -645,7 +726,9 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, } while (!success); } - audio_mixer.add_audio(device, audio_frame.data + audio_offset, num_samples, audio_format, frame_length, audio_frame.received_timestamp); + if (num_samples > 0) { + audio_mixer.add_audio(device, audio_frame.data + audio_offset, num_samples, audio_format, frame_length, audio_frame.received_timestamp); + } // Done with the audio, so release it. if (audio_frame.owner) { @@ -697,8 +780,9 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, new_frame.dropped_frames = dropped_frames; new_frame.received_timestamp = video_frame.received_timestamp; card->new_frames.push_back(move(new_frame)); - card->new_frames_changed.notify_all(); + card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames); } + card->new_frames_changed.notify_all(); return; } @@ -822,8 +906,9 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, new_frame.dropped_frames = dropped_frames; new_frame.received_timestamp = video_frame.received_timestamp; // Ignore the audio timestamp. card->new_frames.push_back(move(new_frame)); - card->new_frames_changed.notify_all(); + card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames); } + card->new_frames_changed.notify_all(); } } @@ -857,9 +942,7 @@ void Mixer::thread_func() } } - steady_clock::time_point start, now; - start = steady_clock::now(); - + BasicStats basic_stats(/*verbose=*/true); int stats_dropped_frames = 0; while (!should_quit) { @@ -936,54 +1019,8 @@ void Mixer::thread_func() ++frame_num; pts_int += frame_duration; - now = steady_clock::now(); - double elapsed = duration(now - start).count(); - - metric_frames_output_total = frame_num; - metric_frames_output_dropped = stats_dropped_frames; - - if (frame_num % 100 == 0) { - printf("%d frames (%d dropped) in %.3f seconds = %.1f fps (%.1f ms/frame)", - frame_num, stats_dropped_frames, elapsed, frame_num / elapsed, - 1e3 * elapsed / frame_num); - // chain->print_phase_timing(); - - // Check our memory usage, to see if we are close to our mlockall() - // limit (if at all set). - rusage used; - if (getrusage(RUSAGE_SELF, &used) == -1) { - perror("getrusage(RUSAGE_SELF)"); - assert(false); - } - - if (uses_mlock) { - rlimit limit; - if (getrlimit(RLIMIT_MEMLOCK, &limit) == -1) { - perror("getrlimit(RLIMIT_MEMLOCK)"); - assert(false); - } - - if (limit.rlim_cur == 0) { - printf(", using %ld MB memory (locked)", - long(used.ru_maxrss / 1024)); - } else { - printf(", using %ld / %ld MB lockable memory (%.1f%%)", - long(used.ru_maxrss / 1024), - long(limit.rlim_cur / 1048576), - float(100.0 * (used.ru_maxrss * 1024.0) / limit.rlim_cur)); - } - metrics_memory_locked_limit_bytes = limit.rlim_cur; - } else { - printf(", using %ld MB memory (not locked)", - long(used.ru_maxrss / 1024)); - metrics_memory_locked_limit_bytes = 0.0 / 0.0; - } - - printf("\n"); - - metrics_memory_used_bytes = used.ru_maxrss * 1024; - } - + basic_stats.update(frame_num, stats_dropped_frames); + // if (frame_num % 100 == 0) chain->print_phase_timing(); if (should_cut.exchange(false)) { // Test and clear. video_encoder->do_cut(frame_num); @@ -1015,7 +1052,7 @@ bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card return (card_index == master_card_index); } -void Mixer::trim_queue(CaptureCard *card, unsigned card_index) +void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length) { // Count the number of frames in the queue, including any frames // we dropped. It's hard to know exactly how we should deal with @@ -1027,18 +1064,17 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index) for (const CaptureCard::NewFrame &frame : card->new_frames) { queue_length += frame.dropped_frames + 1; } - card->queue_length_policy.update_policy(queue_length); // If needed, drop frames until the queue is below the safe limit. // We prefer to drop from the head, because all else being equal, // we'd like more recent frames (less latency). unsigned dropped_frames = 0; - while (queue_length > card->queue_length_policy.get_safe_queue_length()) { + while (queue_length > safe_queue_length) { assert(!card->new_frames.empty()); assert(queue_length > card->new_frames.front().dropped_frames); queue_length -= card->new_frames.front().dropped_frames; - if (queue_length <= card->queue_length_policy.get_safe_queue_length()) { + if (queue_length <= safe_queue_length) { // No need to drop anything. break; } @@ -1050,6 +1086,7 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index) } card->metric_input_dropped_frames_jitter += dropped_frames; + card->metric_input_queue_length_frames = queue_length; #if 0 if (dropped_frames > 0) { @@ -1084,26 +1121,15 @@ start: // and then restart. assert(cards[master_card_index].capture->get_disconnected()); handle_hotplugged_cards(); + lock.unlock(); goto start; } - if (!master_card_is_output) { - output_frame_info.frame_timestamp = - cards[master_card_index].new_frames.front().received_timestamp; - } - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) { CaptureCard *card = &cards[card_index]; - if (input_card_is_master_clock(card_index, master_card_index)) { - // We don't use the queue length policy for the master card, - // but we will if it stops being the master. Thus, clear out - // the policy in case we switch in the future. - card->queue_length_policy.reset(card_index); - assert(!card->new_frames.empty()); + if (card->new_frames.empty()) { // Starvation. + ++card->metric_input_duped_frames; } else { - trim_queue(card, card_index); - } - if (!card->new_frames.empty()) { new_frames[card_index] = move(card->new_frames.front()); has_new_frame[card_index] = true; card->new_frames.pop_front(); @@ -1112,10 +1138,32 @@ start: } if (!master_card_is_output) { + output_frame_info.frame_timestamp = new_frames[master_card_index].received_timestamp; output_frame_info.dropped_frames = new_frames[master_card_index].dropped_frames; output_frame_info.frame_duration = new_frames[master_card_index].length; } + if (!output_frame_info.is_preroll) { + output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames); + } + + for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) { + CaptureCard *card = &cards[card_index]; + if (has_new_frame[card_index] && + !input_card_is_master_clock(card_index, master_card_index) && + !output_frame_info.is_preroll) { + card->queue_length_policy.update_policy( + output_frame_info.frame_timestamp, + card->jitter_history.get_expected_next_frame(), + new_frames[master_card_index].length, + output_frame_info.frame_duration, + card->jitter_history.estimate_max_jitter(), + output_jitter_history.estimate_max_jitter()); + trim_queue(card, min(global_flags.max_input_queue_frames, + card->queue_length_policy.get_safe_queue_length())); + } + } + // This might get off by a fractional sample when changing master card // between ones with different frame rates, but that's fine. int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples; @@ -1235,7 +1283,7 @@ void Mixer::render_one_frame(int64_t duration) // The theme can't (or at least shouldn't!) call connect_signal() on // each FFmpeg input, so we'll do it here. for (const pair &conn : theme->get_signal_connections()) { - conn.first->connect_signal_raw(conn.second->get_card_index()); + conn.first->connect_signal_raw(conn.second->get_card_index(), input_state); } // If HDMI/SDI output is active and the user has requested auto mode,