]> git.sesse.net Git - nageru/blobdiff - mixer.cpp
Rework the queue drop algorithm again.
[nageru] / mixer.cpp
index 9f549e968b7eb173f872d55fc316f7c1982ccc3e..3ce49c18cf9bda311a17604d025607b647c8310c 100644 (file)
--- a/mixer.cpp
+++ b/mixer.cpp
@@ -198,39 +198,95 @@ void upload_texture(GLuint tex, GLuint width, GLuint height, GLuint stride, bool
 
 }  // namespace
 
-void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+void JitterHistory::register_metrics(const vector<pair<string, string>> &labels)
 {
-       global_metrics.add("input_queue_length_frames", labels, &metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
-       global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
-       global_metrics.add("input_queue_duped_frames", labels, &metric_input_duped_frames);
+       global_metrics.add("input_underestimated_jitter_frames", labels, &metric_input_underestimated_jitter_frames);
+       global_metrics.add("input_estimated_max_jitter_seconds", labels, &metric_input_estimated_max_jitter_seconds, Metrics::TYPE_GAUGE);
+}
+
+void JitterHistory::unregister_metrics(const vector<pair<string, string>> &labels)
+{
+       global_metrics.remove("input_underestimated_jitter_frames", labels);
+       global_metrics.remove("input_estimated_max_jitter_seconds", labels);
 }
 
-void QueueLengthPolicy::update_policy(unsigned queue_length)
+void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames)
 {
-       if (queue_length == 0) {  // Starvation.
-               if (been_at_safe_point_since_last_starvation && safe_queue_length < unsigned(global_flags.max_input_queue_frames)) {
-                       ++safe_queue_length;
-                       fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n",
-                               card_index, safe_queue_length);
+       if (expected_timestamp > steady_clock::time_point::min()) {
+               expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+               double jitter_seconds = fabs(duration<double>(expected_timestamp - now).count());
+               history.push_back(orders.insert(jitter_seconds));
+               if (jitter_seconds > estimate_max_jitter()) {
+                       ++metric_input_underestimated_jitter_frames;
                }
-               frames_with_at_least_one = 0;
-               been_at_safe_point_since_last_starvation = false;
-               ++metric_input_duped_frames;
-               metric_input_queue_safe_length_frames = safe_queue_length;
-               metric_input_queue_length_frames = 0;
-               return;
+
+               metric_input_estimated_max_jitter_seconds = estimate_max_jitter();
+
+               if (history.size() > history_length) {
+                       orders.erase(history.front());
+                       history.pop_front();
+               }
+               assert(history.size() <= history_length);
+       }
+       expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+}
+
+double JitterHistory::estimate_max_jitter() const
+{
+       if (orders.empty()) {
+               return 0.0;
        }
-       if (queue_length >= safe_queue_length) {
-               been_at_safe_point_since_last_starvation = true;
+       size_t elem_idx = lrint((orders.size() - 1) * percentile);
+       if (percentile <= 0.5) {
+               return *next(orders.begin(), elem_idx) * multiplier;
+       } else {
+               return *prev(orders.end(), elem_idx + 1) * multiplier;
        }
-       if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) {
-               --safe_queue_length;
-               metric_input_queue_safe_length_frames = safe_queue_length;
-               fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n",
-                       card_index, safe_queue_length);
-               frames_with_at_least_one = 0;
+}
+
+void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+{
+       global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
+}
+
+void QueueLengthPolicy::unregister_metrics(const vector<pair<string, string>> &labels)
+{
+       global_metrics.remove("input_queue_safe_length_frames", labels);
+}
+
+void QueueLengthPolicy::update_policy(steady_clock::time_point now,
+                                      steady_clock::time_point expected_next_frame,
+                                      int64_t master_frame_duration,
+                                      double max_input_card_jitter_seconds,
+                                      double max_master_card_jitter_seconds)
+{
+       double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE);
+
+       // Figure out when we can expect the next frame for this card, assuming
+       // worst-case jitter (ie., the frame is maximally late).
+       double seconds_until_next_frame = max(duration<double>(expected_next_frame - now).count() + max_input_card_jitter_seconds, 0.0);
+
+       // How many times are the master card expected to tick in that time?
+       // We assume the master clock has worst-case jitter but not any rate
+       // discrepancy, ie., it ticks as early as possible every time, but not
+       // cumulatively.
+       double frames_needed = (seconds_until_next_frame + max_master_card_jitter_seconds) / master_frame_duration_seconds;
+
+       // As a special case, if the master card ticks faster than the input card,
+       // we expect the queue to drain by itself even without dropping. But if
+       // the difference is small (e.g. 60 Hz master and 59.94 input), it would
+       // go slowly enough that the effect wouldn't really be appreciable.
+       // We account for this by looking at the situation five frames ahead,
+       // assuming everything else is the same.
+       double frames_allowed;
+       if (max_master_card_jitter_seconds < max_input_card_jitter_seconds) {
+               frames_allowed = frames_needed + 5 * (max_input_card_jitter_seconds - max_master_card_jitter_seconds) / master_frame_duration_seconds;
+       } else {
+               frames_allowed = frames_needed;
        }
-       metric_input_queue_length_frames = min(queue_length, safe_queue_length);  // The caller will drop frames for us if needed.
+
+       safe_queue_length = max<int>(floor(frames_allowed), 0);
+       metric_input_queue_safe_length_frames = safe_queue_length;
 }
 
 Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
@@ -404,6 +460,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
 
        metric_start_time_seconds = get_timestamp_for_metrics();
 
+       output_jitter_history.register_metrics({{ "card", "output" }});
        global_metrics.add("frames_output_total", &metric_frames_output_total);
        global_metrics.add("frames_output_dropped", &metric_frames_output_dropped);
        global_metrics.add("start_time_seconds", &metric_start_time_seconds, Metrics::TYPE_GAUGE);
@@ -478,10 +535,14 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT
        // Unregister old metrics, if any.
        if (!card->labels.empty()) {
                const vector<pair<string, string>> &labels = card->labels;
+               card->jitter_history.unregister_metrics(labels);
+               card->queue_length_policy.unregister_metrics(labels);
                global_metrics.remove("input_received_frames", labels);
                global_metrics.remove("input_dropped_frames_jitter", labels);
                global_metrics.remove("input_dropped_frames_error", labels);
                global_metrics.remove("input_dropped_frames_resets", labels);
+               global_metrics.remove("input_queue_length_frames", labels);
+               global_metrics.remove("input_queue_duped_frames", labels);
 
                global_metrics.remove("input_has_signal_bool", labels);
                global_metrics.remove("input_is_connected_bool", labels);
@@ -512,11 +573,14 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT
        default:
                assert(false);
        }
+       card->jitter_history.register_metrics(labels);
        card->queue_length_policy.register_metrics(labels);
        global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames);
        global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter);
        global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error);
        global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets);
+       global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames);
 
        global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE);
        global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE);
@@ -547,7 +611,7 @@ void Mixer::set_output_card_internal(int card_index)
                lock.unlock();
                fake_capture->stop_dequeue_thread();
                lock.lock();
-               old_card->capture = move(old_card->parked_capture);
+               old_card->capture = move(old_card->parked_capture);  // TODO: reset the metrics
                old_card->is_fake_capture = false;
                old_card->capture->start_bm_capture();
        }
@@ -569,6 +633,7 @@ void Mixer::set_output_card_internal(int card_index)
                card->output->start_output(desired_output_video_mode, pts_int);
        }
        output_card_index = card_index;
+       output_jitter_history.clear();
 }
 
 namespace {
@@ -673,6 +738,11 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode,
 
        card->last_timecode = timecode;
 
+       // Calculate jitter for this card here. We do it on arrival so that we
+       // make sure every frame counts, even the dropped ones -- and it will also
+       // make sure the jitter number is as recent as possible, should it change.
+       card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames);
+
        PBOFrameAllocator::Userdata *userdata = (PBOFrameAllocator::Userdata *)video_frame.userdata;
 
        size_t cbcr_width, cbcr_height, cbcr_offset, y_offset;
@@ -1034,7 +1104,7 @@ bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card
        return (card_index == master_card_index);
 }
 
-void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
+void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length)
 {
        // Count the number of frames in the queue, including any frames
        // we dropped. It's hard to know exactly how we should deal with
@@ -1046,18 +1116,17 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
        for (const CaptureCard::NewFrame &frame : card->new_frames) {
                queue_length += frame.dropped_frames + 1;
        }
-       card->queue_length_policy.update_policy(queue_length);
 
        // If needed, drop frames until the queue is below the safe limit.
        // We prefer to drop from the head, because all else being equal,
        // we'd like more recent frames (less latency).
        unsigned dropped_frames = 0;
-       while (queue_length > card->queue_length_policy.get_safe_queue_length()) {
+       while (queue_length > safe_queue_length) {
                assert(!card->new_frames.empty());
                assert(queue_length > card->new_frames.front().dropped_frames);
                queue_length -= card->new_frames.front().dropped_frames;
 
-               if (queue_length <= card->queue_length_policy.get_safe_queue_length()) {
+               if (queue_length <= safe_queue_length) {
                        // No need to drop anything.
                        break;
                }
@@ -1069,6 +1138,7 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
        }
 
        card->metric_input_dropped_frames_jitter += dropped_frames;
+       card->metric_input_queue_length_frames = queue_length;
 
 #if 0
        if (dropped_frames > 0) {
@@ -1107,23 +1177,11 @@ start:
                goto start;
        }
 
-       if (!master_card_is_output) {
-               output_frame_info.frame_timestamp =
-                       cards[master_card_index].new_frames.front().received_timestamp;
-       }
-
        for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
                CaptureCard *card = &cards[card_index];
-               if (input_card_is_master_clock(card_index, master_card_index)) {
-                       // We don't use the queue length policy for the master card,
-                       // but we will if it stops being the master. Thus, clear out
-                       // the policy in case we switch in the future.
-                       card->queue_length_policy.reset(card_index);
-                       assert(!card->new_frames.empty());
+               if (card->new_frames.empty()) {  // Starvation.
+                       ++card->metric_input_duped_frames;
                } else {
-                       trim_queue(card, card_index);
-               }
-               if (!card->new_frames.empty()) {
                        new_frames[card_index] = move(card->new_frames.front());
                        has_new_frame[card_index] = true;
                        card->new_frames.pop_front();
@@ -1132,10 +1190,31 @@ start:
        }
 
        if (!master_card_is_output) {
+               output_frame_info.frame_timestamp = new_frames[master_card_index].received_timestamp;
                output_frame_info.dropped_frames = new_frames[master_card_index].dropped_frames;
                output_frame_info.frame_duration = new_frames[master_card_index].length;
        }
 
+       if (!output_frame_info.is_preroll) {
+               output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames);
+       }
+
+       for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
+               CaptureCard *card = &cards[card_index];
+               if (has_new_frame[card_index] &&
+                   !input_card_is_master_clock(card_index, master_card_index) &&
+                   !output_frame_info.is_preroll) {
+                       card->queue_length_policy.update_policy(
+                               output_frame_info.frame_timestamp,
+                               card->jitter_history.get_expected_next_frame(),
+                               output_frame_info.frame_duration,
+                               card->jitter_history.estimate_max_jitter(),
+                               output_jitter_history.estimate_max_jitter());
+                       trim_queue(card, min<int>(global_flags.max_input_queue_frames,
+                                                 card->queue_length_policy.get_safe_queue_length()));
+               }
+       }
+
        // This might get off by a fractional sample when changing master card
        // between ones with different frame rates, but that's fine.
        int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples;