From 896b8bb8e44409738aef56ba7d7b7f300c22d2ce Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Mon, 5 Apr 2021 18:18:21 +0200 Subject: [PATCH] WIP patch for async output. --- nageru/bmusb | 2 +- nageru/decklink_output.cpp | 152 ++++++++++++++++++++++++++++++++--- nageru/decklink_output.h | 19 +++-- nageru/mixer.cpp | 38 +++++++-- nageru/mixer.h | 1 + nageru/queue_length_policy.h | 4 +- 6 files changed, 189 insertions(+), 27 deletions(-) diff --git a/nageru/bmusb b/nageru/bmusb index 0c0182f..327dca2 160000 --- a/nageru/bmusb +++ b/nageru/bmusb @@ -1 +1 @@ -Subproject commit 0c0182f453e8bc26e630615ea6d2a2f05e868fde +Subproject commit 327dca2d848e4c4656be1bfb54a2edf2e6587a71 diff --git a/nageru/decklink_output.cpp b/nageru/decklink_output.cpp index da172af..f061766 100644 --- a/nageru/decklink_output.cpp +++ b/nageru/decklink_output.cpp @@ -126,7 +126,7 @@ bool DeckLinkOutput::set_device(IDeckLink *decklink) return true; } -void DeckLinkOutput::start_output(uint32_t mode, int64_t base_pts) +void DeckLinkOutput::start_output(uint32_t mode, int64_t base_pts, bool is_master_card) { assert(output); assert(!playback_initiated); @@ -139,7 +139,11 @@ void DeckLinkOutput::start_output(uint32_t mode, int64_t base_pts) should_quit.unquit(); playback_initiated = true; playback_started = false; - this->base_pts = base_pts; + if (is_master_card) { + this->base_pts = base_pts; + } else { + this->next_output_pts = 0; + } IDeckLinkConfiguration *config = nullptr; if (output->QueryInterface(IID_IDeckLinkConfiguration, (void**)&config) != S_OK) { @@ -219,9 +223,17 @@ void DeckLinkOutput::start_output(uint32_t mode, int64_t base_pts) fprintf(stderr, "Couldn't enable audio output\n"); abort(); } - if (output->BeginAudioPreroll() != S_OK) { - fprintf(stderr, "Couldn't begin audio preroll\n"); - abort(); + if (is_master_card) { + if (output->BeginAudioPreroll() != S_OK) { + fprintf(stderr, "Couldn't begin audio preroll\n"); + abort(); + } + } else { + if (output->StartScheduledPlayback(/*base_pts=*/0, TIMEBASE, 1.0) != S_OK) { + fprintf(stderr, "Could not start playback\n"); + abort(); // TODO + } + playback_started = true; } present_thread = thread([this]{ @@ -275,6 +287,8 @@ void DeckLinkOutput::send_frame(GLuint y_tex, GLuint cbcr_tex, YCbCrLumaCoeffici { assert(!should_quit.should_quit()); + input_jitter_history.frame_arrived(steady_clock::now(), duration, /*dropped_frames=*/0, true); + if ((current_mode_flags & bmdDisplayModeColorspaceRec601) && output_ycbcr_coefficients == YCBCR_REC_709) { if (!last_frame_had_mode_mismatch) { fprintf(stderr, "WARNING: Chosen output mode expects Rec. 601 Y'CbCr coefficients.\n"); @@ -346,6 +360,8 @@ void DeckLinkOutput::send_frame(GLuint y_tex, GLuint cbcr_tex, YCbCrLumaCoeffici void DeckLinkOutput::send_audio(int64_t pts, const std::vector &samples) { + return; // FIXME we may need to map pts or something? + unique_ptr int_samples(new int32_t[samples.size()]); for (size_t i = 0; i < samples.size(); ++i) { int_samples[i] = lrintf(samples[i] * 2147483648.0f); @@ -439,7 +455,7 @@ uint32_t DeckLinkOutput::pick_video_mode(uint32_t mode) const } // Prioritize 59.94 > 60 > 29.97. If none of those are found, then pick the highest one. - for (const pair &desired : vector>{ { 60000, 1001 }, { 60, 1 }, { 30000, 1001 } }) { + for (const pair &desired : vector>{ { 50, 1 }, { 60000, 1001 }, { 60, 1 }, { 30000, 1001 } }) { for (const auto &it : video_modes) { if (it.second.frame_rate_num * desired.second == desired.first * it.second.frame_rate_den) { return it.first; @@ -473,22 +489,70 @@ YCbCrLumaCoefficients DeckLinkOutput::preferred_ycbcr_coefficients() const HRESULT DeckLinkOutput::ScheduledFrameCompleted(/* in */ IDeckLinkVideoFrame *completedFrame, /* in */ BMDOutputFrameCompletionResult result) { Frame *frame = static_cast(completedFrame); + + BMDTimeValue stream_frame_time, played_at_time; + BMDTimeValue hardwareTime, timeInFrame, ticksPerFrame; + double playback_speed; + output->GetFrameCompletionReferenceTimestamp(frame, TIMEBASE, &played_at_time); + output->GetScheduledStreamTime(TIMEBASE, &stream_frame_time, &playback_speed); + output->GetHardwareReferenceClock(TIMEBASE, &hardwareTime, &timeInFrame, &ticksPerFrame); + + steady_clock::time_point now = steady_clock::now(); + int frame_delay = (stream_frame_time - frame->pts) / frame_duration - 1; + map status = { + { bmdOutputFrameCompleted, "played" }, + { bmdOutputFrameDisplayedLate, "DELAYED" }, + { bmdOutputFrameDropped, "DROPPED" }, + { bmdOutputFrameFlushed, "FLUSHED" } + }; + + if ((result == bmdOutputFrameCompleted || result == bmdOutputFrameDisplayedLate) && false) { + fprintf(stderr, "now=%ld / %.2f: frame with pts=%ld (%ld ago, %d delay) / %.2f was %s at time %ld (%ld ago)\n", + stream_frame_time, PTSToTime(stream_frame_time), + frame->pts, stream_frame_time - frame->pts, frame_delay, PTSToTime(frame->pts), + status[result].c_str(), + played_at_time, hardwareTime - played_at_time); + } else if (result == bmdOutputFrameDisplayedLate) { + fprintf(stderr, "now=%ld / %.2f: frame with pts=%ld (%ld ago, %d delay) / %.2f was %s to %.2f\n", + stream_frame_time, PTSToTime(stream_frame_time), + frame->pts, stream_frame_time - frame->pts, frame_delay, PTSToTime(frame->pts), + status[result].c_str(), PTSToTime(frame->pts) + frame_delay); + } else { + fprintf(stderr, "now=%ld / %.2f: frame with pts=%ld (%ld ago, %d delay) / %.2f was %s\n", + stream_frame_time, PTSToTime(stream_frame_time), + frame->pts, stream_frame_time - frame->pts, frame_delay, PTSToTime(frame->pts), + status[result].c_str()); + } + if (frame_delay < 0) { + fprintf(stderr, "ERROR: Frame went backwards in time (scheduled to start at pts=%ld, ended at or before pts=%ld), something is strange.\n", + frame->pts, stream_frame_time); + frame_delay = 0; + } + switch (result) { case bmdOutputFrameCompleted: ++metric_decklink_output_completed_frames_completed; + if (frame_delay != 0) { + fprintf(stderr, "ERROR: Frame was reportedly completed without delay, but was delayed nevertheless.\n"); + // Our callback _might_ be delayed 1+ frame for other reasons, + // so ignore this. It's a pity GetFrameCompletionReferenceTimestamp() + // cannot give us a timestamp on the same time scale as + // GetScheduledStreamTime(); it would be more robust. + frame_delay = 0; + } break; case bmdOutputFrameDisplayedLate: - fprintf(stderr, "Output frame displayed late (pts=%" PRId64 ")\n", frame->pts); - fprintf(stderr, "Consider increasing --output-buffer-frames if this persists.\n"); + // fprintf(stderr, "Output frame displayed late (pts=%" PRId64 ")\n", frame->pts); + //fprintf(stderr, "Consider increasing --output-buffer-frames if this persists.\n"); ++metric_decklink_output_completed_frames_late; break; case bmdOutputFrameDropped: - fprintf(stderr, "Output frame was dropped (pts=%" PRId64 ")\n", frame->pts); - fprintf(stderr, "Consider increasing --output-buffer-frames if this persists.\n"); + // fprintf(stderr, "Output frame was dropped (pts=%" PRId64 ")\n", frame->pts); + //fprintf(stderr, "Consider increasing --output-buffer-frames if this persists.\n"); ++metric_decklink_output_completed_frames_dropped; break; case bmdOutputFrameFlushed: - fprintf(stderr, "Output frame was flushed (pts=%" PRId64 ")\n", frame->pts); + // fprintf(stderr, "Output frame was flushed (pts=%" PRId64 ")\n", frame->pts); ++metric_decklink_output_completed_frames_flushed; break; default: @@ -503,9 +567,55 @@ HRESULT DeckLinkOutput::ScheduledFrameCompleted(/* in */ IDeckLinkVideoFrame *co { lock_guard lock(frame_queue_mutex); frame_freelist.push(unique_ptr(frame)); - assert(scheduled_frames.front() == frame); - scheduled_frames.pop_front(); + + // Dropped frames can come out-of-order, so we can't just look at the front; + // we need to go and find it in the list. + auto it = find(scheduled_frames.begin(), scheduled_frames.end(), frame); + assert(it != scheduled_frames.end()); + scheduled_frames.erase(it); --metric_decklink_output_inflight_frames; + + if (frame_delay > 0 && result == bmdOutputFrameDisplayedLate) { + // All frames that were queued earlier will be delayed, + // so update so that we don't double-count the delay. + int64_t prev_frame_pts = frame->pts + frame_delay * frame_duration; + for (Frame *other_frame : scheduled_frames) { + int64_t old_pts = other_frame->pts; + other_frame->pts = std::max(other_frame->pts, prev_frame_pts + frame_duration); + fprintf(stderr, " - moving frame from pts=%ld (%.3f) to pts=%ld (%.3f)\n", + old_pts, PTSToTime(old_pts), other_frame->pts, PTSToTime(other_frame->pts)); + prev_frame_pts = other_frame->pts; + } + + int64_t earliest_next_frame = (stream_frame_time + frame_duration - 1) / frame_duration * frame_duration; + earliest_next_frame = std::max(earliest_next_frame, prev_frame_pts + frame_duration); + if (next_output_pts < earliest_next_frame) { + // In effect, duplicate a frame. FIXME write something about this + // FIXME is this really right now? but perhaps we're forced + // and it messes up the queue length calculation temporarily, we need something else there + //fprintf(stderr, "Duplicating frame %d times due to starvation!\n", frame_delay); + + // FIXME this causes us to believe in output jitter? + fprintf(stderr, " - moving output pointer from pts=%ld (%.3f) to pts=%ld (%.3f)\n", + next_output_pts, PTSToTime(next_output_pts), earliest_next_frame, PTSToTime(earliest_next_frame)); + next_output_pts = earliest_next_frame; + // FIXME metric + } + } + + if (result == bmdOutputFrameCompleted || result == bmdOutputFrameDisplayedLate) { + //output_jitter_history.frame_arrived(now, frame_duration, /*dropped_frames=*/frame_delay, true); + // TODO: backdate now, and possibly also get_expected_next_frame()? + queue_length_policy.update_policy( + now, + input_jitter_history.get_expected_next_frame(), + frame->duration, frame_duration, + input_jitter_history.estimate_max_jitter(), + // output_jitter_history.estimate_max_jitter(), true); + 0.0, true); + num_safe_frames = queue_length_policy.get_safe_queue_length(); + } + fprintf(stderr, "%zu frames in flight, safe queue length = %u (starv=%d)\n", scheduled_frames.size(), queue_length_policy.get_safe_queue_length(), frame_delay); } return S_OK; @@ -574,6 +684,17 @@ void DeckLinkOutput::present_thread_func() } frame = move(pending_video_frames.front()); pending_video_frames.pop(); + + if (scheduled_frames.size() > num_safe_frames) { // FIXME check off-by-one here + fprintf(stderr, "Dropping frame to keep latency down!\n"); + // FIXME metric + continue; + } + + // Overwrite the pts given by the client; it doesn't own our clock. + // TODO: Write something about what semi-unsynchronized really means. + frame->pts = next_output_pts; + next_output_pts += frame_duration; } for ( ;; ) { @@ -612,6 +733,11 @@ void DeckLinkOutput::present_thread_func() } } +double DeckLinkOutput::PTSToTime(int64_t pts) +{ + return double(pts) / frame_duration; +} + HRESULT STDMETHODCALLTYPE DeckLinkOutput::QueryInterface(REFIID, LPVOID *) { return E_NOINTERFACE; diff --git a/nageru/decklink_output.h b/nageru/decklink_output.h index f1b40f4..9c994a8 100644 --- a/nageru/decklink_output.h +++ b/nageru/decklink_output.h @@ -20,6 +20,7 @@ #include "shared/context.h" #include "print_latency.h" +#include "queue_length_policy.h" #include "quittable_sleeper.h" #include "ref_counted_frame.h" #include "shared/ref_counted_gl_sync.h" @@ -40,12 +41,14 @@ public: DeckLinkOutput(movit::ResourcePool *resource_pool, QSurface *surface, unsigned width, unsigned height, unsigned card_index); bool set_device(IDeckLink *output); - void start_output(uint32_t mode, int64_t base_pts); // Mode comes from get_available_video_modes(). + void start_output(uint32_t mode, int64_t base_pts, bool is_master_card); // Mode comes from get_available_video_modes(). void end_output(); void send_frame(GLuint y_tex, GLuint cbcr_tex, movit::YCbCrLumaCoefficients ycbcr_coefficients, const std::vector &input_frames, int64_t pts, int64_t duration); void send_audio(int64_t pts, const std::vector &samples); + // Only makes sense if is_master_card is true. + // // NOTE: The returned timestamp is undefined for preroll. // Otherwise, it is the timestamp of the output frame as it should have been, // even if we're overshooting. E.g. at 50 fps (0.02 spf), assuming the @@ -122,6 +125,7 @@ private: void create_uyvy(GLuint y_tex, GLuint cbcr_tex, GLuint dst_tex); void present_thread_func(); + double PTSToTime(int64_t pts); std::atomic refcount{1}; @@ -131,10 +135,15 @@ private: std::thread present_thread; QuittableSleeper should_quit; - std::mutex frame_queue_mutex; - std::queue> pending_video_frames; // Under . - std::queue> frame_freelist; // Under . - std::deque scheduled_frames; // Owned by the driver, so no unique_ptr. Under . + std::mutex frame_queue_mutex; // Protects all members in this block. + std::queue> pending_video_frames; + std::queue> frame_freelist; + std::deque scheduled_frames; // Owned by the driver, so no unique_ptr. + unsigned num_safe_frames = 1; + int64_t next_output_pts = 0; + JitterHistory input_jitter_history, output_jitter_history; + QueueLengthPolicy queue_length_policy; + // End of variables protected by frame_queue_mutex. std::condition_variable frame_queues_changed; bool playback_initiated = false, playback_started = false; diff --git a/nageru/mixer.cpp b/nageru/mixer.cpp index ce8a732..2286742 100644 --- a/nageru/mixer.cpp +++ b/nageru/mixer.cpp @@ -232,7 +232,7 @@ void JitterHistory::unregister_metrics(const vector> &label global_metrics.remove("input_estimated_max_jitter_seconds", labels); } -void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames) +void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames, bool verbose) { if (frame_duration != last_duration) { // If the frame rate changed, the input clock is also going to change, @@ -243,10 +243,23 @@ void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_du // rather keep the history so that we take jitter they may introduce into account.) clear(); last_duration = frame_duration; + if (verbose) { + fprintf(stderr, "JITTER %p: clearing due to format change\n", this); + } } if (expected_timestamp > steady_clock::time_point::min()) { expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE); double jitter_seconds = fabs(duration(expected_timestamp - now).count()); + if (verbose) { + fprintf(stderr, "JITTER %p: expected_ts=%.6f (after adding %.1f ms for %zu dropped frames, duration=%ld), now=%.6f => %.1f ms late\n", + this, + duration(expected_timestamp.time_since_epoch()).count(), + 1e3 * dropped_frames * duration(nanoseconds(frame_duration * 1000000000 / TIMEBASE)).count(), + dropped_frames, + frame_duration, + duration(now.time_since_epoch()).count(), + 1e3 * duration(now - expected_timestamp).count()); + } history.push_back(orders.insert(jitter_seconds)); if (jitter_seconds > estimate_max_jitter()) { ++metric_input_underestimated_jitter_frames; @@ -259,6 +272,12 @@ void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_du history.pop_front(); } assert(history.size() <= history_length); + } else if (verbose) { + fprintf(stderr, "JITTER %p: now=%.6f, expected=%.6f duration=%ld [initial]\n", + this, + duration(now.time_since_epoch()).count(), + duration((now + nanoseconds(frame_duration * 1000000000 / TIMEBASE)).time_since_epoch()).count(), + frame_duration); } expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE); } @@ -291,7 +310,7 @@ void QueueLengthPolicy::update_policy(steady_clock::time_point now, int64_t input_frame_duration, int64_t master_frame_duration, double max_input_card_jitter_seconds, - double max_master_card_jitter_seconds) + double max_master_card_jitter_seconds, bool verbose) { double input_frame_duration_seconds = input_frame_duration / double(TIMEBASE); double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE); @@ -318,6 +337,13 @@ void QueueLengthPolicy::update_policy(steady_clock::time_point now, } else { frames_allowed = frames_needed; } + if (verbose) { + fprintf(stderr, "secs_until_next_frame = %.1f ms, input jitter = %.1f ms, master jitter = %.1f ms, frames_allowed = %.3f\n", + 1e3 * duration(expected_next_input_frame - now).count(), + 1e3 * max_input_card_jitter_seconds, + 1e3 * max_master_card_jitter_seconds, + frames_allowed); + } safe_queue_length = max(floor(frames_allowed), 0); metric_input_queue_safe_length_frames = safe_queue_length; @@ -899,7 +925,7 @@ void Mixer::set_output_card_internal(int card_index) card->jitter_history.clear(); card->capture->start_bm_capture(); desired_output_video_mode = output_video_mode = card->output->pick_video_mode(desired_output_video_mode); - card->output->start_output(desired_output_video_mode, pts_int); + card->output->start_output(desired_output_video_mode, pts_int, /*is_master_card=*/slave_to_output); } output_card_index = card_index; output_jitter_history.clear(); @@ -1286,7 +1312,7 @@ void Mixer::thread_func() DeckLinkOutput *output = cards[output_card_index].output.get(); output->end_output(); desired_output_video_mode = output_video_mode = output->pick_video_mode(desired_output_video_mode); - output->start_output(desired_output_video_mode, pts_int); + output->start_output(desired_output_video_mode, pts_int, /*is_master_card=*/slave_to_output); } { @@ -1299,7 +1325,7 @@ void Mixer::thread_func() bool master_card_is_output; unsigned master_card_index; - if (output_card_index != -1) { + if (output_card_index != -1 && slave_to_output) { master_card_is_output = true; master_card_index = output_card_index; } else { @@ -1406,7 +1432,7 @@ void Mixer::thread_func() bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card_index) const { - if (output_card_index != -1) { + if (output_card_index != -1 && slave_to_output) { // The output card (ie., cards[output_card_index].output) is the master clock, // so no input card (ie., cards[card_index].capture) is. return false; diff --git a/nageru/mixer.h b/nageru/mixer.h index 6e9da90..63c7f8e 100644 --- a/nageru/mixer.h +++ b/nageru/mixer.h @@ -399,6 +399,7 @@ private: std::atomic master_clock_channel{0}; // Gets overridden by if set. int output_card_index = -1; // -1 for none. uint32_t output_video_mode = -1; + bool slave_to_output = false; // Only relevant if output_card_index != -1. // The mechanics of changing the output card and modes are so intricately connected // with the work the mixer thread is doing. Thus, we don't change it directly, diff --git a/nageru/queue_length_policy.h b/nageru/queue_length_policy.h index 0b8a420..d1ee786 100644 --- a/nageru/queue_length_policy.h +++ b/nageru/queue_length_policy.h @@ -37,7 +37,7 @@ public: orders.clear(); expected_timestamp = std::chrono::steady_clock::time_point::min(); } - void frame_arrived(std::chrono::steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames); + void frame_arrived(std::chrono::steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames, bool verbose = false); std::chrono::steady_clock::time_point get_expected_next_frame() const { return expected_timestamp; } double estimate_max_jitter() const; @@ -94,7 +94,7 @@ public: int64_t input_frame_duration, int64_t master_frame_duration, double max_input_card_jitter_seconds, - double max_master_card_jitter_seconds); + double max_master_card_jitter_seconds, bool verbose=false); unsigned get_safe_queue_length() const { return safe_queue_length; } private: -- 2.39.2