From 9821f303f6d295ae85521aaa83c275c6305b644c Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 10 Feb 2017 01:07:24 +0100 Subject: [PATCH] Refactor QueueLengthPolicy. In particular, dropped frames are now counted as part of the queue length. --- mixer.cpp | 87 ++++++++++++++++++++++++++++++++++++++----------------- mixer.h | 16 ++++++---- 2 files changed, 70 insertions(+), 33 deletions(-) diff --git a/mixer.cpp b/mixer.cpp index f82650e..093e63e 100644 --- a/mixer.cpp +++ b/mixer.cpp @@ -78,25 +78,25 @@ void insert_new_frame(RefCountedFrame frame, unsigned field_num, bool interlaced } // namespace -void QueueLengthPolicy::update_policy(int queue_length) +void QueueLengthPolicy::update_policy(unsigned queue_length) { - if (queue_length < 0) { // Starvation. - if (been_at_safe_point_since_last_starvation && safe_queue_length < 5) { + if (queue_length == 0) { // Starvation. + if (been_at_safe_point_since_last_starvation && safe_queue_length < 6) { ++safe_queue_length; - fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frames\n", + fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n", card_index, safe_queue_length); } frames_with_at_least_one = 0; been_at_safe_point_since_last_starvation = false; return; } - if (queue_length > 0) { - if (queue_length >= int(safe_queue_length)) { + if (queue_length >= 1) { + if (queue_length >= safe_queue_length) { been_at_safe_point_since_last_starvation = true; } - if (++frames_with_at_least_one >= 1000 && safe_queue_length > 0) { + if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) { --safe_queue_length; - fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frames\n", + fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n", card_index, safe_queue_length); frames_with_at_least_one = 0; } @@ -249,7 +249,7 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, bool if (card->surface == nullptr) { card->surface = create_surface_with_same_format(mixer_surface); } - while (!card->new_frames.empty()) card->new_frames.pop(); + while (!card->new_frames.empty()) card->new_frames.pop_front(); card->last_timecode = -1; card->capture->configure_card(); @@ -413,7 +413,7 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, new_frame.length = frame_length; new_frame.interlaced = false; new_frame.dropped_frames = dropped_frames; - card->new_frames.push(move(new_frame)); + card->new_frames.push_back(move(new_frame)); card->new_frames_changed.notify_all(); } return; @@ -552,7 +552,7 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, new_frame.upload_func = upload_func; new_frame.dropped_frames = dropped_frames; new_frame.received_timestamp = video_frame.received_timestamp; // Ignore the audio timestamp. - card->new_frames.push(move(new_frame)); + card->new_frames.push_back(move(new_frame)); card->new_frames_changed.notify_all(); } } @@ -739,6 +739,47 @@ bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card return (card_index == master_card_index); } +void Mixer::trim_queue(CaptureCard *card, unsigned card_index) +{ + // Count the number of frames in the queue, including any frames + // we dropped. It's hard to know exactly how we should deal with + // dropped (corrupted) input frames; they don't help our goal of + // avoiding starvation, but they still add to the problem of latency. + // Since dropped frames is going to mean a bump in the signal anyway, + // we err on the side of having more stable latency instead. + unsigned queue_length = 0; + for (const CaptureCard::NewFrame &frame : card->new_frames) { + queue_length += frame.dropped_frames + 1; + } + card->queue_length_policy.update_policy(queue_length); + + // If needed, drop frames until the queue is below the safe limit. + // We prefer to drop from the head, because all else being equal, + // we'd like more recent frames (less latency). + unsigned dropped_frames = 0; + while (queue_length > card->queue_length_policy.get_safe_queue_length()) { + assert(!card->new_frames.empty()); + assert(queue_length > card->new_frames.front().dropped_frames); + queue_length -= card->new_frames.front().dropped_frames; + + if (queue_length <= card->queue_length_policy.get_safe_queue_length()) { + // No need to drop anything. + break; + } + + card->new_frames.pop_front(); + card->new_frames_changed.notify_all(); + --queue_length; + ++dropped_frames; + } + + if (dropped_frames > 0) { + fprintf(stderr, "Card %u dropped %u frame(s) to keep latency down.\n", + card_index, dropped_frames); + } +} + + Mixer::OutputFrameInfo Mixer::get_one_frame_from_each_card(unsigned master_card_index, bool master_card_is_output, CaptureCard::NewFrame new_frames[MAX_VIDEO_CARDS], bool has_new_frame[MAX_VIDEO_CARDS]) { OutputFrameInfo output_frame_info; @@ -773,28 +814,20 @@ start: for (unsigned card_index = 0; card_index < num_cards; ++card_index) { CaptureCard *card = &cards[card_index]; - if (card->new_frames.empty()) { - assert(!input_card_is_master_clock(card_index, master_card_index)); - card->queue_length_policy.update_policy(-1); - continue; - } - new_frames[card_index] = move(card->new_frames.front()); - has_new_frame[card_index] = true; - card->new_frames.pop(); - card->new_frames_changed.notify_all(); - if (input_card_is_master_clock(card_index, master_card_index)) { // We don't use the queue length policy for the master card, // but we will if it stops being the master. Thus, clear out // the policy in case we switch in the future. card->queue_length_policy.reset(card_index); + assert(!card->new_frames.empty()); } else { - // If we have excess frames compared to the policy for this card, - // drop frames from the head. - card->queue_length_policy.update_policy(card->new_frames.size()); - while (card->new_frames.size() > card->queue_length_policy.get_safe_queue_length()) { - card->new_frames.pop(); - } + trim_queue(card, card_index); + } + if (!card->new_frames.empty()) { + new_frames[card_index] = move(card->new_frames.front()); + has_new_frame[card_index] = true; + card->new_frames.pop_front(); + card->new_frames_changed.notify_all(); } } diff --git a/mixer.h b/mixer.h index bf388b1..191ea3d 100644 --- a/mixer.h +++ b/mixer.h @@ -68,8 +68,7 @@ class ResourcePool; // // N is reduced as follows: If the queue has had at least one spare frame for // at least 50 (master) frames (ie., it's been too conservative for a second), -// we reduce N by 1 and reset the timers. TODO: Only do this if N ever actually -// touched the limit. +// we reduce N by 1 and reset the timers. // // Whenever the queue is starved (we needed a frame but there was none), // and we've been at N since the last starvation, N was obviously too low, @@ -79,17 +78,17 @@ public: QueueLengthPolicy() {} void reset(unsigned card_index) { this->card_index = card_index; - safe_queue_length = 0; + safe_queue_length = 1; frames_with_at_least_one = 0; been_at_safe_point_since_last_starvation = false; } - void update_policy(int queue_length); // Give in -1 for starvation. + void update_policy(unsigned queue_length); // Call before picking out a frame, so 0 means starvation. unsigned get_safe_queue_length() const { return safe_queue_length; } private: unsigned card_index; // For debugging only. - unsigned safe_queue_length = 0; // Called N in the comments. + unsigned safe_queue_length = 1; // Called N in the comments. Can never go below 1. unsigned frames_with_at_least_one = 0; bool been_at_safe_point_since_last_starvation = false; }; @@ -323,6 +322,8 @@ public: } private: + struct CaptureCard; + void configure_card(unsigned card_index, bmusb::CaptureInterface *capture, bool is_fake_capture, DeckLinkOutput *output); void set_output_card_internal(int card_index); // Should only be called from the mixer thread. void bm_frame(unsigned card_index, uint16_t timecode, @@ -338,6 +339,9 @@ private: void audio_thread_func(); void release_display_frame(DisplayFrame *frame); double pts() { return double(pts_int) / TIMEBASE; } + // Call this _before_ trying to pull out a frame from a capture card; + // it will update the policy and drop the right amount of frames for you. + void trim_queue(CaptureCard *card, unsigned card_index); HTTPD httpd; unsigned num_cards; @@ -399,7 +403,7 @@ private: unsigned dropped_frames = 0; // Number of dropped frames before this one. std::chrono::steady_clock::time_point received_timestamp = std::chrono::steady_clock::time_point::min(); }; - std::queue new_frames; + std::deque new_frames; bool should_quit = false; std::condition_variable new_frames_changed; // Set whenever new_frames (or should_quit) is changed. -- 2.39.2