]> git.sesse.net Git - nageru/commitdiff
Add a queue of frames going into VideoStream.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 26 Oct 2018 16:10:38 +0000 (18:10 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 26 Oct 2018 16:10:38 +0000 (18:10 +0200)
Hopefully this smooths out disk jitter a fair bit, as we essentially
get a precalculation/preload process.

player.cpp
player.h
video_stream.cpp
video_stream.h

index a6a8056b53290b9746abac5beec0f45d38f8135d..0d8593878f67445e3dc867dfc0ea2d0719593082 100644 (file)
@@ -73,7 +73,7 @@ wait_for_clip:
 
                if (!clip_ready) {
                        if (video_stream != nullptr) {
-                               video_stream->schedule_refresh_frame(pts, /*display_func=*/nullptr);
+                               video_stream->schedule_refresh_frame(steady_clock::now(), pts, /*display_func=*/nullptr);
                        }
                        continue;
                }
@@ -85,7 +85,7 @@ wait_for_clip:
                        clip = current_clip;
                        stream_idx = current_stream_idx;
                }
-               steady_clock::time_point origin = steady_clock::now();
+               steady_clock::time_point origin = steady_clock::now();  // TODO: Add a 100 ms buffer for ramp-up?
                int64_t in_pts_origin = clip.pts_in;
 got_clip:
                int64_t out_pts_origin = pts;
@@ -180,13 +180,21 @@ got_clip:
                                break;
                        }
 
-                       // Sleep until the next frame start, or until there's a new clip we're supposed to play.
+                       // If the queue is full (which is really the state we'd like to be in),
+                       // wait until there's room for one more frame (ie., one was output from
+                       // VideoStream), or until or until there's a new clip we're supposed to play.
                        {
                                unique_lock<mutex> lock(queue_state_mu);
-                               new_clip_changed.wait_until(lock, next_frame_start, [this]{
+                               new_clip_changed.wait(lock, [this]{
+                                       if (video_stream != nullptr && num_queued_frames < max_queued_frames) {
+                                               return true;
+                                       }
                                        return new_clip_ready || override_stream_idx != -1;
                                });
                                if (new_clip_ready) {
+                                       if (video_stream != nullptr) {
+                                               video_stream->clear_queue();
+                                       }
                                        goto wait_for_clip;
                                }
                                if (override_stream_idx != -1) {
@@ -199,14 +207,21 @@ got_clip:
                        if (in_pts_lower == in_pts_upper) {
                                auto display_func = [this, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha]{
                                        destination->setFrame(primary_stream_idx, in_pts_lower, /*interpolated=*/false, secondary_stream_idx, secondary_pts, fade_alpha);
+                                       unique_lock<mutex> lock(queue_state_mu);
+                                       assert(num_queued_frames > 0);
+                                       --num_queued_frames;
                                };
                                if (video_stream == nullptr) {
                                        display_func();
                                } else {
                                        if (secondary_stream_idx == -1) {
-                                               video_stream->schedule_original_frame(pts, display_func, primary_stream_idx, in_pts_lower);
+                                               video_stream->schedule_original_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower);
+                                               unique_lock<mutex> lock(queue_state_mu);
+                                               ++num_queued_frames;
                                        } else {
-                                               video_stream->schedule_faded_frame(pts, display_func, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha);
+                                               bool ok = video_stream->schedule_faded_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha);
+                                               unique_lock<mutex> lock(queue_state_mu);
+                                               if (ok) ++num_queued_frames;
                                        }
                                }
                                continue;
@@ -221,14 +236,21 @@ got_clip:
                                if (fabs(snap_pts_as_frameno - frameno) < 0.01) {
                                        auto display_func = [this, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha]{
                                                destination->setFrame(primary_stream_idx, snap_pts, /*interpolated=*/false, secondary_stream_idx, secondary_pts, fade_alpha);
+                                               unique_lock<mutex> lock(queue_state_mu);
+                                               assert(num_queued_frames > 0);
+                                               --num_queued_frames;
                                        };
                                        if (video_stream == nullptr) {
                                                display_func();
                                        } else {
                                                if (secondary_stream_idx == -1) {
-                                                       video_stream->schedule_original_frame(pts, display_func, primary_stream_idx, snap_pts);
+                                                       video_stream->schedule_original_frame(next_frame_start, pts, display_func, primary_stream_idx, snap_pts);
+                                                       unique_lock<mutex> lock(queue_state_mu);
+                                                       ++num_queued_frames;
                                                } else {
-                                                       video_stream->schedule_faded_frame(pts, display_func, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha);
+                                                       bool ok = video_stream->schedule_faded_frame(next_frame_start, pts, display_func, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha);
+                                                       unique_lock<mutex> lock(queue_state_mu);
+                                                       if (ok) ++num_queued_frames;
                                                }
                                        }
                                        in_pts_origin += snap_pts - in_pts;
@@ -253,12 +275,14 @@ got_clip:
                                assert(secondary_stream_idx == -1);
                                destination->setFrame(primary_stream_idx, in_pts_lower, /*interpolated=*/false);
                        } else {
-                               // Calculate the interpolated frame. When it's done, the destination
-                               // will be unblocked.
                                auto display_func = [this, primary_stream_idx, pts, secondary_stream_idx, secondary_pts, fade_alpha]{
                                        destination->setFrame(primary_stream_idx, pts, /*interpolated=*/true, secondary_stream_idx, secondary_pts, fade_alpha);
+                                       assert(num_queued_frames > 0);
+                                       --num_queued_frames;
                                };
-                               video_stream->schedule_interpolated_frame(pts, display_func, primary_stream_idx, in_pts_lower, in_pts_upper, alpha, secondary_stream_idx, secondary_pts, fade_alpha);
+                               bool ok = video_stream->schedule_interpolated_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower, in_pts_upper, alpha, secondary_stream_idx, secondary_pts, fade_alpha);
+                               unique_lock<mutex> lock(queue_state_mu);
+                               if (ok) ++num_queued_frames;
                        }
                }
 
index 63d4871c8c653ebdbe02bb1ef10259d93ea46b01..f5adcf7a505dfed1636466bb06cf689c788bba5a 100644 (file)
--- a/player.h
+++ b/player.h
@@ -64,6 +64,11 @@ private:
        int override_stream_idx = -1;  // Under queue_state_mu.
 
        std::unique_ptr<VideoStream> video_stream;  // Can be nullptr.
+
+       // under queue_state_mu. Part of this instead of VideoStream so that we own
+       // its lock and can sleep on it.
+       size_t num_queued_frames = 0;
+       static constexpr size_t max_queued_frames = 10;
 };
 
 #endif  // !defined(_PLAYER_H)
index 91b5df0c4ee26ee3166a79683fc37b22e77570e8..39f633cc72f1232fc60f09cbaf09d4df406c0e80 100644 (file)
@@ -22,6 +22,7 @@ extern "C" {
 #include <unistd.h>
 
 using namespace std;
+using namespace std::chrono;
 
 extern HTTPD *global_httpd;
 
@@ -156,12 +157,12 @@ VideoStream::VideoStream()
        GLuint fade_y_output_tex[num_interpolate_slots], fade_cbcr_output_tex[num_interpolate_slots];
        GLuint cb_tex[num_interpolate_slots], cr_tex[num_interpolate_slots];
 
-       glCreateTextures(GL_TEXTURE_2D_ARRAY, 10, input_tex);
-       glCreateTextures(GL_TEXTURE_2D_ARRAY, 10, gray_tex);
-       glCreateTextures(GL_TEXTURE_2D, 10, fade_y_output_tex);
-       glCreateTextures(GL_TEXTURE_2D, 10, fade_cbcr_output_tex);
-       glCreateTextures(GL_TEXTURE_2D, 10, cb_tex);
-       glCreateTextures(GL_TEXTURE_2D, 10, cr_tex);
+       glCreateTextures(GL_TEXTURE_2D_ARRAY, num_interpolate_slots, input_tex);
+       glCreateTextures(GL_TEXTURE_2D_ARRAY, num_interpolate_slots, gray_tex);
+       glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, fade_y_output_tex);
+       glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, fade_cbcr_output_tex);
+       glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, cb_tex);
+       glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, cr_tex);
        check_error();
 
        constexpr size_t width = 1280, height = 720;  // FIXME: adjustable width, height
@@ -281,11 +282,18 @@ void VideoStream::stop()
        encode_thread.join();
 }
 
-void VideoStream::schedule_original_frame(int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts)
+void VideoStream::clear_queue()
+{
+       unique_lock<mutex> lock(queue_lock);
+       frame_queue.clear();
+}
+
+void VideoStream::schedule_original_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts)
 {
        fprintf(stderr, "output_pts=%ld  original      input_pts=%ld\n", output_pts, input_pts);
 
        QueuedFrame qf;
+       qf.local_pts = local_pts;
        qf.type = QueuedFrame::ORIGINAL;
        qf.output_pts = output_pts;
        qf.stream_idx = stream_idx;
@@ -294,10 +302,10 @@ void VideoStream::schedule_original_frame(int64_t output_pts, function<void()> &
 
        unique_lock<mutex> lock(queue_lock);
        frame_queue.push_back(qf);
-       queue_nonempty.notify_all();
+       queue_changed.notify_all();
 }
 
-void VideoStream::schedule_faded_frame(int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
+bool VideoStream::schedule_faded_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
 {
        fprintf(stderr, "output_pts=%ld  faded         input_pts=%ld,%ld  fade_alpha=%.2f\n", output_pts, input_pts, secondary_input_pts, fade_alpha);
 
@@ -310,7 +318,7 @@ void VideoStream::schedule_faded_frame(int64_t output_pts, function<void()> &&di
                unique_lock<mutex> lock(queue_lock);
                if (interpolate_resources.empty()) {
                        fprintf(stderr, "WARNING: Too many interpolated frames already in transit; dropping one.\n");
-                       return;
+                       return false;
                }
                resources = interpolate_resources.front();
                interpolate_resources.pop_front();
@@ -333,6 +341,7 @@ void VideoStream::schedule_faded_frame(int64_t output_pts, function<void()> &&di
        ycbcr_semiplanar_converter->prepare_chain_for_fade(frame1, frame2, fade_alpha)->render_to_fbo(resources.fade_fbo, 1280, 720);
 
        QueuedFrame qf;
+       qf.local_pts = local_pts;
        qf.type = QueuedFrame::FADED;
        qf.output_pts = output_pts;
        qf.stream_idx = stream_idx;
@@ -366,10 +375,11 @@ void VideoStream::schedule_faded_frame(int64_t output_pts, function<void()> &&di
 
        unique_lock<mutex> lock(queue_lock);
        frame_queue.push_back(qf);
-       queue_nonempty.notify_all();
+       queue_changed.notify_all();
+       return true;
 }
 
-void VideoStream::schedule_interpolated_frame(int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
+bool VideoStream::schedule_interpolated_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
 {
        if (secondary_stream_idx != -1) {
                fprintf(stderr, "output_pts=%ld  interpolated  input_pts1=%ld input_pts2=%ld alpha=%.3f  secondary_pts=%ld  fade_alpha=%.2f\n", output_pts, input_first_pts, input_second_pts, alpha, secondary_input_pts, fade_alpha);
@@ -390,7 +400,7 @@ void VideoStream::schedule_interpolated_frame(int64_t output_pts, function<void(
                unique_lock<mutex> lock(queue_lock);
                if (interpolate_resources.empty()) {
                        fprintf(stderr, "WARNING: Too many interpolated frames already in transit; dropping one.\n");
-                       return;
+                       return false;
                }
                resources = interpolate_resources.front();
                interpolate_resources.pop_front();
@@ -480,10 +490,11 @@ void VideoStream::schedule_interpolated_frame(int64_t output_pts, function<void(
 
        unique_lock<mutex> lock(queue_lock);
        frame_queue.push_back(qf);
-       queue_nonempty.notify_all();
+       queue_changed.notify_all();
+       return true;
 }
 
-void VideoStream::schedule_refresh_frame(int64_t output_pts, function<void()> &&display_func)
+void VideoStream::schedule_refresh_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func)
 {
        QueuedFrame qf;
        qf.type = QueuedFrame::REFRESH;
@@ -492,7 +503,7 @@ void VideoStream::schedule_refresh_frame(int64_t output_pts, function<void()> &&
 
        unique_lock<mutex> lock(queue_lock);
        frame_queue.push_back(qf);
-       queue_nonempty.notify_all();
+       queue_changed.notify_all();
 }
 
 namespace {
@@ -541,9 +552,22 @@ void VideoStream::encode_thread_func()
                QueuedFrame qf;
                {
                        unique_lock<mutex> lock(queue_lock);
-                       queue_nonempty.wait(lock, [this]{
+
+                       // Wait until we have a frame to play.
+                       queue_changed.wait(lock, [this]{
                                return !frame_queue.empty();
                        });
+                       steady_clock::time_point frame_start = frame_queue.front().local_pts;
+
+                       // Now sleep until the frame is supposed to start (the usual case),
+                       // _or_ clear_queue() happened.
+                       bool aborted = queue_changed.wait_until(lock, frame_start, [this, frame_start]{
+                               return frame_queue.empty() || frame_queue.front().local_pts != frame_start;
+                       });
+                       if (aborted) {
+                               // clear_queue() happened, so don't play this frame after all.
+                               continue;
+                       }
                        qf = frame_queue.front();
                        frame_queue.pop_front();
                }
index c1b808e9baf069e0b6d65258fb6fe063ea1e60f2..64f72d8104d6aef219a6747e251b501bc4b29d16 100644 (file)
@@ -11,6 +11,7 @@ extern "C" {
 #include "jpeg_frame_view.h"
 #include "ref_counted_gl_sync.h"
 
+#include <chrono>
 #include <condition_variable>
 #include <deque>
 #include <functional>
@@ -35,13 +36,15 @@ public:
        ~VideoStream();
        void start();
        void stop();
+       void clear_queue();
 
        // “display_func” is called after the frame has been calculated (if needed)
-       // and has gone out to the stream.
-       void schedule_original_frame(int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts);
-       void schedule_faded_frame(int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha);
-       void schedule_interpolated_frame(int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx = -1, int64_t secondary_inputs_pts = -1, float fade_alpha = 0.0f);  // -1 = no secondary frame.
-       void schedule_refresh_frame(int64_t output_pts, std::function<void()> &&display_func);
+       // and has gone out to the stream. Returns false on failure (ie., couldn't
+       // schedule the frame due to lack of resources).
+       void schedule_original_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts);
+       bool schedule_faded_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha);
+       bool schedule_interpolated_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx = -1, int64_t secondary_inputs_pts = -1, float fade_alpha = 0.0f);  // -1 = no secondary frame.
+       void schedule_refresh_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func);
 
 private:
        void encode_thread_func();
@@ -67,9 +70,11 @@ private:
                void *pbo_contents;  // Persistently mapped.
        };
        std::deque<InterpolatedFrameResources> interpolate_resources;  // Under <queue_lock>.
-       static constexpr size_t num_interpolate_slots = 10;
+       static constexpr size_t num_interpolate_slots = 15;  // Should be larger than Player::max_queued_frames, or we risk mass-dropping frames.
 
        struct QueuedFrame {
+               std::chrono::steady_clock::time_point local_pts;
+
                int64_t output_pts;
                enum Type { ORIGINAL, FADED, INTERPOLATED, FADED_INTERPOLATED, REFRESH } type;
                unsigned stream_idx;
@@ -91,7 +96,7 @@ private:
        };
        std::deque<QueuedFrame> frame_queue;  // Under <queue_lock>.
        std::mutex queue_lock;
-       std::condition_variable queue_nonempty;
+       std::condition_variable queue_changed;
 
        std::unique_ptr<Mux> stream_mux;  // To HTTP.
        std::string stream_mux_header;