X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=mixer.cpp;h=3ce49c18cf9bda311a17604d025607b647c8310c;hb=afe163c940b7239cab0577fa390796175cdc94a6;hp=6ed62113ff4f775edfb3e110be77db8f7a136bf5;hpb=7ac4f511034b3a69be8cf1344fd8c2693eeeee4c;p=nageru diff --git a/mixer.cpp b/mixer.cpp index 6ed6211..3ce49c1 100644 --- a/mixer.cpp +++ b/mixer.cpp @@ -198,39 +198,95 @@ void upload_texture(GLuint tex, GLuint width, GLuint height, GLuint stride, bool } // namespace -void QueueLengthPolicy::register_metrics(const vector> &labels) +void JitterHistory::register_metrics(const vector> &labels) { - global_metrics.add("input_queue_length_frames", labels, &metric_input_queue_length_frames, Metrics::TYPE_GAUGE); - global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE); - global_metrics.add("input_queue_duped_frames", labels, &metric_input_duped_frames); + global_metrics.add("input_underestimated_jitter_frames", labels, &metric_input_underestimated_jitter_frames); + global_metrics.add("input_estimated_max_jitter_seconds", labels, &metric_input_estimated_max_jitter_seconds, Metrics::TYPE_GAUGE); +} + +void JitterHistory::unregister_metrics(const vector> &labels) +{ + global_metrics.remove("input_underestimated_jitter_frames", labels); + global_metrics.remove("input_estimated_max_jitter_seconds", labels); } -void QueueLengthPolicy::update_policy(unsigned queue_length) +void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames) { - if (queue_length == 0) { // Starvation. - if (been_at_safe_point_since_last_starvation && safe_queue_length < unsigned(global_flags.max_input_queue_frames)) { - ++safe_queue_length; - fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n", - card_index, safe_queue_length); + if (expected_timestamp > steady_clock::time_point::min()) { + expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE); + double jitter_seconds = fabs(duration(expected_timestamp - now).count()); + history.push_back(orders.insert(jitter_seconds)); + if (jitter_seconds > estimate_max_jitter()) { + ++metric_input_underestimated_jitter_frames; } - frames_with_at_least_one = 0; - been_at_safe_point_since_last_starvation = false; - ++metric_input_duped_frames; - metric_input_queue_safe_length_frames = safe_queue_length; - metric_input_queue_length_frames = 0; - return; + + metric_input_estimated_max_jitter_seconds = estimate_max_jitter(); + + if (history.size() > history_length) { + orders.erase(history.front()); + history.pop_front(); + } + assert(history.size() <= history_length); } - if (queue_length >= safe_queue_length) { - been_at_safe_point_since_last_starvation = true; + expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE); +} + +double JitterHistory::estimate_max_jitter() const +{ + if (orders.empty()) { + return 0.0; } - if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) { - --safe_queue_length; - metric_input_queue_safe_length_frames = safe_queue_length; - fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n", - card_index, safe_queue_length); - frames_with_at_least_one = 0; + size_t elem_idx = lrint((orders.size() - 1) * percentile); + if (percentile <= 0.5) { + return *next(orders.begin(), elem_idx) * multiplier; + } else { + return *prev(orders.end(), elem_idx + 1) * multiplier; } - metric_input_queue_length_frames = min(queue_length, safe_queue_length); // The caller will drop frames for us if needed. +} + +void QueueLengthPolicy::register_metrics(const vector> &labels) +{ + global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE); +} + +void QueueLengthPolicy::unregister_metrics(const vector> &labels) +{ + global_metrics.remove("input_queue_safe_length_frames", labels); +} + +void QueueLengthPolicy::update_policy(steady_clock::time_point now, + steady_clock::time_point expected_next_frame, + int64_t master_frame_duration, + double max_input_card_jitter_seconds, + double max_master_card_jitter_seconds) +{ + double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE); + + // Figure out when we can expect the next frame for this card, assuming + // worst-case jitter (ie., the frame is maximally late). + double seconds_until_next_frame = max(duration(expected_next_frame - now).count() + max_input_card_jitter_seconds, 0.0); + + // How many times are the master card expected to tick in that time? + // We assume the master clock has worst-case jitter but not any rate + // discrepancy, ie., it ticks as early as possible every time, but not + // cumulatively. + double frames_needed = (seconds_until_next_frame + max_master_card_jitter_seconds) / master_frame_duration_seconds; + + // As a special case, if the master card ticks faster than the input card, + // we expect the queue to drain by itself even without dropping. But if + // the difference is small (e.g. 60 Hz master and 59.94 input), it would + // go slowly enough that the effect wouldn't really be appreciable. + // We account for this by looking at the situation five frames ahead, + // assuming everything else is the same. + double frames_allowed; + if (max_master_card_jitter_seconds < max_input_card_jitter_seconds) { + frames_allowed = frames_needed + 5 * (max_input_card_jitter_seconds - max_master_card_jitter_seconds) / master_frame_duration_seconds; + } else { + frames_allowed = frames_needed; + } + + safe_queue_length = max(floor(frames_allowed), 0); + metric_input_queue_safe_length_frames = safe_queue_length; } Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) @@ -404,6 +460,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards) metric_start_time_seconds = get_timestamp_for_metrics(); + output_jitter_history.register_metrics({{ "card", "output" }}); global_metrics.add("frames_output_total", &metric_frames_output_total); global_metrics.add("frames_output_dropped", &metric_frames_output_dropped); global_metrics.add("start_time_seconds", &metric_start_time_seconds, Metrics::TYPE_GAUGE); @@ -475,6 +532,28 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT audio_mixer.set_display_name(device, card->capture->get_description()); audio_mixer.trigger_state_changed_callback(); + // Unregister old metrics, if any. + if (!card->labels.empty()) { + const vector> &labels = card->labels; + card->jitter_history.unregister_metrics(labels); + card->queue_length_policy.unregister_metrics(labels); + global_metrics.remove("input_received_frames", labels); + global_metrics.remove("input_dropped_frames_jitter", labels); + global_metrics.remove("input_dropped_frames_error", labels); + global_metrics.remove("input_dropped_frames_resets", labels); + global_metrics.remove("input_queue_length_frames", labels); + global_metrics.remove("input_queue_duped_frames", labels); + + global_metrics.remove("input_has_signal_bool", labels); + global_metrics.remove("input_is_connected_bool", labels); + global_metrics.remove("input_interlaced_bool", labels); + global_metrics.remove("input_width_pixels", labels); + global_metrics.remove("input_height_pixels", labels); + global_metrics.remove("input_frame_rate_nom", labels); + global_metrics.remove("input_frame_rate_den", labels); + global_metrics.remove("input_sample_rate_hz", labels); + } + // Register metrics. vector> labels; char card_name[64]; @@ -494,11 +573,14 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT default: assert(false); } + card->jitter_history.register_metrics(labels); card->queue_length_policy.register_metrics(labels); global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames); global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter); global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error); global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets); + global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE); + global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames); global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE); global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE); @@ -508,6 +590,7 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT global_metrics.add("input_frame_rate_nom", labels, &card->metric_input_frame_rate_nom, Metrics::TYPE_GAUGE); global_metrics.add("input_frame_rate_den", labels, &card->metric_input_frame_rate_den, Metrics::TYPE_GAUGE); global_metrics.add("input_sample_rate_hz", labels, &card->metric_input_sample_rate_hz, Metrics::TYPE_GAUGE); + card->labels = labels; } void Mixer::set_output_card_internal(int card_index) @@ -528,7 +611,7 @@ void Mixer::set_output_card_internal(int card_index) lock.unlock(); fake_capture->stop_dequeue_thread(); lock.lock(); - old_card->capture = move(old_card->parked_capture); + old_card->capture = move(old_card->parked_capture); // TODO: reset the metrics old_card->is_fake_capture = false; old_card->capture->start_bm_capture(); } @@ -550,6 +633,7 @@ void Mixer::set_output_card_internal(int card_index) card->output->start_output(desired_output_video_mode, pts_int); } output_card_index = card_index; + output_jitter_history.clear(); } namespace { @@ -654,6 +738,11 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, card->last_timecode = timecode; + // Calculate jitter for this card here. We do it on arrival so that we + // make sure every frame counts, even the dropped ones -- and it will also + // make sure the jitter number is as recent as possible, should it change. + card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames); + PBOFrameAllocator::Userdata *userdata = (PBOFrameAllocator::Userdata *)video_frame.userdata; size_t cbcr_width, cbcr_height, cbcr_offset, y_offset; @@ -1015,7 +1104,7 @@ bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card return (card_index == master_card_index); } -void Mixer::trim_queue(CaptureCard *card, unsigned card_index) +void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length) { // Count the number of frames in the queue, including any frames // we dropped. It's hard to know exactly how we should deal with @@ -1027,18 +1116,17 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index) for (const CaptureCard::NewFrame &frame : card->new_frames) { queue_length += frame.dropped_frames + 1; } - card->queue_length_policy.update_policy(queue_length); // If needed, drop frames until the queue is below the safe limit. // We prefer to drop from the head, because all else being equal, // we'd like more recent frames (less latency). unsigned dropped_frames = 0; - while (queue_length > card->queue_length_policy.get_safe_queue_length()) { + while (queue_length > safe_queue_length) { assert(!card->new_frames.empty()); assert(queue_length > card->new_frames.front().dropped_frames); queue_length -= card->new_frames.front().dropped_frames; - if (queue_length <= card->queue_length_policy.get_safe_queue_length()) { + if (queue_length <= safe_queue_length) { // No need to drop anything. break; } @@ -1050,6 +1138,7 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index) } card->metric_input_dropped_frames_jitter += dropped_frames; + card->metric_input_queue_length_frames = queue_length; #if 0 if (dropped_frames > 0) { @@ -1084,26 +1173,15 @@ start: // and then restart. assert(cards[master_card_index].capture->get_disconnected()); handle_hotplugged_cards(); + lock.unlock(); goto start; } - if (!master_card_is_output) { - output_frame_info.frame_timestamp = - cards[master_card_index].new_frames.front().received_timestamp; - } - for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) { CaptureCard *card = &cards[card_index]; - if (input_card_is_master_clock(card_index, master_card_index)) { - // We don't use the queue length policy for the master card, - // but we will if it stops being the master. Thus, clear out - // the policy in case we switch in the future. - card->queue_length_policy.reset(card_index); - assert(!card->new_frames.empty()); + if (card->new_frames.empty()) { // Starvation. + ++card->metric_input_duped_frames; } else { - trim_queue(card, card_index); - } - if (!card->new_frames.empty()) { new_frames[card_index] = move(card->new_frames.front()); has_new_frame[card_index] = true; card->new_frames.pop_front(); @@ -1112,10 +1190,31 @@ start: } if (!master_card_is_output) { + output_frame_info.frame_timestamp = new_frames[master_card_index].received_timestamp; output_frame_info.dropped_frames = new_frames[master_card_index].dropped_frames; output_frame_info.frame_duration = new_frames[master_card_index].length; } + if (!output_frame_info.is_preroll) { + output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames); + } + + for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) { + CaptureCard *card = &cards[card_index]; + if (has_new_frame[card_index] && + !input_card_is_master_clock(card_index, master_card_index) && + !output_frame_info.is_preroll) { + card->queue_length_policy.update_policy( + output_frame_info.frame_timestamp, + card->jitter_history.get_expected_next_frame(), + output_frame_info.frame_duration, + card->jitter_history.estimate_max_jitter(), + output_jitter_history.estimate_max_jitter()); + trim_queue(card, min(global_flags.max_input_queue_frames, + card->queue_length_policy.get_safe_queue_length())); + } + } + // This might get off by a fractional sample when changing master card // between ones with different frame rates, but that's fine. int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples;