From: Steinar H. Gunderson Date: Fri, 26 Oct 2018 16:10:38 +0000 (+0200) Subject: Add a queue of frames going into VideoStream. X-Git-Tag: 1.8.0~76^2~39 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=fca89bd21985639735df96a527d86f8aa05b61fe;p=nageru Add a queue of frames going into VideoStream. Hopefully this smooths out disk jitter a fair bit, as we essentially get a precalculation/preload process. --- diff --git a/player.cpp b/player.cpp index a6a8056..0d85938 100644 --- a/player.cpp +++ b/player.cpp @@ -73,7 +73,7 @@ wait_for_clip: if (!clip_ready) { if (video_stream != nullptr) { - video_stream->schedule_refresh_frame(pts, /*display_func=*/nullptr); + video_stream->schedule_refresh_frame(steady_clock::now(), pts, /*display_func=*/nullptr); } continue; } @@ -85,7 +85,7 @@ wait_for_clip: clip = current_clip; stream_idx = current_stream_idx; } - steady_clock::time_point origin = steady_clock::now(); + steady_clock::time_point origin = steady_clock::now(); // TODO: Add a 100 ms buffer for ramp-up? int64_t in_pts_origin = clip.pts_in; got_clip: int64_t out_pts_origin = pts; @@ -180,13 +180,21 @@ got_clip: break; } - // Sleep until the next frame start, or until there's a new clip we're supposed to play. + // If the queue is full (which is really the state we'd like to be in), + // wait until there's room for one more frame (ie., one was output from + // VideoStream), or until or until there's a new clip we're supposed to play. { unique_lock lock(queue_state_mu); - new_clip_changed.wait_until(lock, next_frame_start, [this]{ + new_clip_changed.wait(lock, [this]{ + if (video_stream != nullptr && num_queued_frames < max_queued_frames) { + return true; + } return new_clip_ready || override_stream_idx != -1; }); if (new_clip_ready) { + if (video_stream != nullptr) { + video_stream->clear_queue(); + } goto wait_for_clip; } if (override_stream_idx != -1) { @@ -199,14 +207,21 @@ got_clip: if (in_pts_lower == in_pts_upper) { auto display_func = [this, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha]{ destination->setFrame(primary_stream_idx, in_pts_lower, /*interpolated=*/false, secondary_stream_idx, secondary_pts, fade_alpha); + unique_lock lock(queue_state_mu); + assert(num_queued_frames > 0); + --num_queued_frames; }; if (video_stream == nullptr) { display_func(); } else { if (secondary_stream_idx == -1) { - video_stream->schedule_original_frame(pts, display_func, primary_stream_idx, in_pts_lower); + video_stream->schedule_original_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower); + unique_lock lock(queue_state_mu); + ++num_queued_frames; } else { - video_stream->schedule_faded_frame(pts, display_func, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha); + bool ok = video_stream->schedule_faded_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha); + unique_lock lock(queue_state_mu); + if (ok) ++num_queued_frames; } } continue; @@ -221,14 +236,21 @@ got_clip: if (fabs(snap_pts_as_frameno - frameno) < 0.01) { auto display_func = [this, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha]{ destination->setFrame(primary_stream_idx, snap_pts, /*interpolated=*/false, secondary_stream_idx, secondary_pts, fade_alpha); + unique_lock lock(queue_state_mu); + assert(num_queued_frames > 0); + --num_queued_frames; }; if (video_stream == nullptr) { display_func(); } else { if (secondary_stream_idx == -1) { - video_stream->schedule_original_frame(pts, display_func, primary_stream_idx, snap_pts); + video_stream->schedule_original_frame(next_frame_start, pts, display_func, primary_stream_idx, snap_pts); + unique_lock lock(queue_state_mu); + ++num_queued_frames; } else { - video_stream->schedule_faded_frame(pts, display_func, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha); + bool ok = video_stream->schedule_faded_frame(next_frame_start, pts, display_func, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha); + unique_lock lock(queue_state_mu); + if (ok) ++num_queued_frames; } } in_pts_origin += snap_pts - in_pts; @@ -253,12 +275,14 @@ got_clip: assert(secondary_stream_idx == -1); destination->setFrame(primary_stream_idx, in_pts_lower, /*interpolated=*/false); } else { - // Calculate the interpolated frame. When it's done, the destination - // will be unblocked. auto display_func = [this, primary_stream_idx, pts, secondary_stream_idx, secondary_pts, fade_alpha]{ destination->setFrame(primary_stream_idx, pts, /*interpolated=*/true, secondary_stream_idx, secondary_pts, fade_alpha); + assert(num_queued_frames > 0); + --num_queued_frames; }; - video_stream->schedule_interpolated_frame(pts, display_func, primary_stream_idx, in_pts_lower, in_pts_upper, alpha, secondary_stream_idx, secondary_pts, fade_alpha); + bool ok = video_stream->schedule_interpolated_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower, in_pts_upper, alpha, secondary_stream_idx, secondary_pts, fade_alpha); + unique_lock lock(queue_state_mu); + if (ok) ++num_queued_frames; } } diff --git a/player.h b/player.h index 63d4871..f5adcf7 100644 --- a/player.h +++ b/player.h @@ -64,6 +64,11 @@ private: int override_stream_idx = -1; // Under queue_state_mu. std::unique_ptr video_stream; // Can be nullptr. + + // under queue_state_mu. Part of this instead of VideoStream so that we own + // its lock and can sleep on it. + size_t num_queued_frames = 0; + static constexpr size_t max_queued_frames = 10; }; #endif // !defined(_PLAYER_H) diff --git a/video_stream.cpp b/video_stream.cpp index 91b5df0..39f633c 100644 --- a/video_stream.cpp +++ b/video_stream.cpp @@ -22,6 +22,7 @@ extern "C" { #include using namespace std; +using namespace std::chrono; extern HTTPD *global_httpd; @@ -156,12 +157,12 @@ VideoStream::VideoStream() GLuint fade_y_output_tex[num_interpolate_slots], fade_cbcr_output_tex[num_interpolate_slots]; GLuint cb_tex[num_interpolate_slots], cr_tex[num_interpolate_slots]; - glCreateTextures(GL_TEXTURE_2D_ARRAY, 10, input_tex); - glCreateTextures(GL_TEXTURE_2D_ARRAY, 10, gray_tex); - glCreateTextures(GL_TEXTURE_2D, 10, fade_y_output_tex); - glCreateTextures(GL_TEXTURE_2D, 10, fade_cbcr_output_tex); - glCreateTextures(GL_TEXTURE_2D, 10, cb_tex); - glCreateTextures(GL_TEXTURE_2D, 10, cr_tex); + glCreateTextures(GL_TEXTURE_2D_ARRAY, num_interpolate_slots, input_tex); + glCreateTextures(GL_TEXTURE_2D_ARRAY, num_interpolate_slots, gray_tex); + glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, fade_y_output_tex); + glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, fade_cbcr_output_tex); + glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, cb_tex); + glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, cr_tex); check_error(); constexpr size_t width = 1280, height = 720; // FIXME: adjustable width, height @@ -281,11 +282,18 @@ void VideoStream::stop() encode_thread.join(); } -void VideoStream::schedule_original_frame(int64_t output_pts, function &&display_func, unsigned stream_idx, int64_t input_pts) +void VideoStream::clear_queue() +{ + unique_lock lock(queue_lock); + frame_queue.clear(); +} + +void VideoStream::schedule_original_frame(steady_clock::time_point local_pts, int64_t output_pts, function &&display_func, unsigned stream_idx, int64_t input_pts) { fprintf(stderr, "output_pts=%ld original input_pts=%ld\n", output_pts, input_pts); QueuedFrame qf; + qf.local_pts = local_pts; qf.type = QueuedFrame::ORIGINAL; qf.output_pts = output_pts; qf.stream_idx = stream_idx; @@ -294,10 +302,10 @@ void VideoStream::schedule_original_frame(int64_t output_pts, function & unique_lock lock(queue_lock); frame_queue.push_back(qf); - queue_nonempty.notify_all(); + queue_changed.notify_all(); } -void VideoStream::schedule_faded_frame(int64_t output_pts, function &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha) +bool VideoStream::schedule_faded_frame(steady_clock::time_point local_pts, int64_t output_pts, function &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha) { fprintf(stderr, "output_pts=%ld faded input_pts=%ld,%ld fade_alpha=%.2f\n", output_pts, input_pts, secondary_input_pts, fade_alpha); @@ -310,7 +318,7 @@ void VideoStream::schedule_faded_frame(int64_t output_pts, function &&di unique_lock lock(queue_lock); if (interpolate_resources.empty()) { fprintf(stderr, "WARNING: Too many interpolated frames already in transit; dropping one.\n"); - return; + return false; } resources = interpolate_resources.front(); interpolate_resources.pop_front(); @@ -333,6 +341,7 @@ void VideoStream::schedule_faded_frame(int64_t output_pts, function &&di ycbcr_semiplanar_converter->prepare_chain_for_fade(frame1, frame2, fade_alpha)->render_to_fbo(resources.fade_fbo, 1280, 720); QueuedFrame qf; + qf.local_pts = local_pts; qf.type = QueuedFrame::FADED; qf.output_pts = output_pts; qf.stream_idx = stream_idx; @@ -366,10 +375,11 @@ void VideoStream::schedule_faded_frame(int64_t output_pts, function &&di unique_lock lock(queue_lock); frame_queue.push_back(qf); - queue_nonempty.notify_all(); + queue_changed.notify_all(); + return true; } -void VideoStream::schedule_interpolated_frame(int64_t output_pts, function &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha) +bool VideoStream::schedule_interpolated_frame(steady_clock::time_point local_pts, int64_t output_pts, function &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha) { if (secondary_stream_idx != -1) { fprintf(stderr, "output_pts=%ld interpolated input_pts1=%ld input_pts2=%ld alpha=%.3f secondary_pts=%ld fade_alpha=%.2f\n", output_pts, input_first_pts, input_second_pts, alpha, secondary_input_pts, fade_alpha); @@ -390,7 +400,7 @@ void VideoStream::schedule_interpolated_frame(int64_t output_pts, function lock(queue_lock); if (interpolate_resources.empty()) { fprintf(stderr, "WARNING: Too many interpolated frames already in transit; dropping one.\n"); - return; + return false; } resources = interpolate_resources.front(); interpolate_resources.pop_front(); @@ -480,10 +490,11 @@ void VideoStream::schedule_interpolated_frame(int64_t output_pts, function lock(queue_lock); frame_queue.push_back(qf); - queue_nonempty.notify_all(); + queue_changed.notify_all(); + return true; } -void VideoStream::schedule_refresh_frame(int64_t output_pts, function &&display_func) +void VideoStream::schedule_refresh_frame(steady_clock::time_point local_pts, int64_t output_pts, function &&display_func) { QueuedFrame qf; qf.type = QueuedFrame::REFRESH; @@ -492,7 +503,7 @@ void VideoStream::schedule_refresh_frame(int64_t output_pts, function && unique_lock lock(queue_lock); frame_queue.push_back(qf); - queue_nonempty.notify_all(); + queue_changed.notify_all(); } namespace { @@ -541,9 +552,22 @@ void VideoStream::encode_thread_func() QueuedFrame qf; { unique_lock lock(queue_lock); - queue_nonempty.wait(lock, [this]{ + + // Wait until we have a frame to play. + queue_changed.wait(lock, [this]{ return !frame_queue.empty(); }); + steady_clock::time_point frame_start = frame_queue.front().local_pts; + + // Now sleep until the frame is supposed to start (the usual case), + // _or_ clear_queue() happened. + bool aborted = queue_changed.wait_until(lock, frame_start, [this, frame_start]{ + return frame_queue.empty() || frame_queue.front().local_pts != frame_start; + }); + if (aborted) { + // clear_queue() happened, so don't play this frame after all. + continue; + } qf = frame_queue.front(); frame_queue.pop_front(); } diff --git a/video_stream.h b/video_stream.h index c1b808e..64f72d8 100644 --- a/video_stream.h +++ b/video_stream.h @@ -11,6 +11,7 @@ extern "C" { #include "jpeg_frame_view.h" #include "ref_counted_gl_sync.h" +#include #include #include #include @@ -35,13 +36,15 @@ public: ~VideoStream(); void start(); void stop(); + void clear_queue(); // “display_func” is called after the frame has been calculated (if needed) - // and has gone out to the stream. - void schedule_original_frame(int64_t output_pts, std::function &&display_func, unsigned stream_idx, int64_t input_pts); - void schedule_faded_frame(int64_t output_pts, std::function &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha); - void schedule_interpolated_frame(int64_t output_pts, std::function &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx = -1, int64_t secondary_inputs_pts = -1, float fade_alpha = 0.0f); // -1 = no secondary frame. - void schedule_refresh_frame(int64_t output_pts, std::function &&display_func); + // and has gone out to the stream. Returns false on failure (ie., couldn't + // schedule the frame due to lack of resources). + void schedule_original_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function &&display_func, unsigned stream_idx, int64_t input_pts); + bool schedule_faded_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha); + bool schedule_interpolated_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx = -1, int64_t secondary_inputs_pts = -1, float fade_alpha = 0.0f); // -1 = no secondary frame. + void schedule_refresh_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function &&display_func); private: void encode_thread_func(); @@ -67,9 +70,11 @@ private: void *pbo_contents; // Persistently mapped. }; std::deque interpolate_resources; // Under . - static constexpr size_t num_interpolate_slots = 10; + static constexpr size_t num_interpolate_slots = 15; // Should be larger than Player::max_queued_frames, or we risk mass-dropping frames. struct QueuedFrame { + std::chrono::steady_clock::time_point local_pts; + int64_t output_pts; enum Type { ORIGINAL, FADED, INTERPOLATED, FADED_INTERPOLATED, REFRESH } type; unsigned stream_idx; @@ -91,7 +96,7 @@ private: }; std::deque frame_queue; // Under . std::mutex queue_lock; - std::condition_variable queue_nonempty; + std::condition_variable queue_changed; std::unique_ptr stream_mux; // To HTTP. std::string stream_mux_header;