if (!clip_ready) {
if (video_stream != nullptr) {
- video_stream->schedule_refresh_frame(pts, /*display_func=*/nullptr);
+ video_stream->schedule_refresh_frame(steady_clock::now(), pts, /*display_func=*/nullptr);
}
continue;
}
clip = current_clip;
stream_idx = current_stream_idx;
}
- steady_clock::time_point origin = steady_clock::now();
+ steady_clock::time_point origin = steady_clock::now(); // TODO: Add a 100 ms buffer for ramp-up?
int64_t in_pts_origin = clip.pts_in;
got_clip:
int64_t out_pts_origin = pts;
break;
}
- // Sleep until the next frame start, or until there's a new clip we're supposed to play.
+ // If the queue is full (which is really the state we'd like to be in),
+ // wait until there's room for one more frame (ie., one was output from
+ // VideoStream), or until or until there's a new clip we're supposed to play.
{
unique_lock<mutex> lock(queue_state_mu);
- new_clip_changed.wait_until(lock, next_frame_start, [this]{
+ new_clip_changed.wait(lock, [this]{
+ if (video_stream != nullptr && num_queued_frames < max_queued_frames) {
+ return true;
+ }
return new_clip_ready || override_stream_idx != -1;
});
if (new_clip_ready) {
+ if (video_stream != nullptr) {
+ video_stream->clear_queue();
+ }
goto wait_for_clip;
}
if (override_stream_idx != -1) {
if (in_pts_lower == in_pts_upper) {
auto display_func = [this, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha]{
destination->setFrame(primary_stream_idx, in_pts_lower, /*interpolated=*/false, secondary_stream_idx, secondary_pts, fade_alpha);
+ unique_lock<mutex> lock(queue_state_mu);
+ assert(num_queued_frames > 0);
+ --num_queued_frames;
};
if (video_stream == nullptr) {
display_func();
} else {
if (secondary_stream_idx == -1) {
- video_stream->schedule_original_frame(pts, display_func, primary_stream_idx, in_pts_lower);
+ video_stream->schedule_original_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower);
+ unique_lock<mutex> lock(queue_state_mu);
+ ++num_queued_frames;
} else {
- video_stream->schedule_faded_frame(pts, display_func, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha);
+ bool ok = video_stream->schedule_faded_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower, secondary_stream_idx, secondary_pts, fade_alpha);
+ unique_lock<mutex> lock(queue_state_mu);
+ if (ok) ++num_queued_frames;
}
}
continue;
if (fabs(snap_pts_as_frameno - frameno) < 0.01) {
auto display_func = [this, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha]{
destination->setFrame(primary_stream_idx, snap_pts, /*interpolated=*/false, secondary_stream_idx, secondary_pts, fade_alpha);
+ unique_lock<mutex> lock(queue_state_mu);
+ assert(num_queued_frames > 0);
+ --num_queued_frames;
};
if (video_stream == nullptr) {
display_func();
} else {
if (secondary_stream_idx == -1) {
- video_stream->schedule_original_frame(pts, display_func, primary_stream_idx, snap_pts);
+ video_stream->schedule_original_frame(next_frame_start, pts, display_func, primary_stream_idx, snap_pts);
+ unique_lock<mutex> lock(queue_state_mu);
+ ++num_queued_frames;
} else {
- video_stream->schedule_faded_frame(pts, display_func, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha);
+ bool ok = video_stream->schedule_faded_frame(next_frame_start, pts, display_func, primary_stream_idx, snap_pts, secondary_stream_idx, secondary_pts, fade_alpha);
+ unique_lock<mutex> lock(queue_state_mu);
+ if (ok) ++num_queued_frames;
}
}
in_pts_origin += snap_pts - in_pts;
assert(secondary_stream_idx == -1);
destination->setFrame(primary_stream_idx, in_pts_lower, /*interpolated=*/false);
} else {
- // Calculate the interpolated frame. When it's done, the destination
- // will be unblocked.
auto display_func = [this, primary_stream_idx, pts, secondary_stream_idx, secondary_pts, fade_alpha]{
destination->setFrame(primary_stream_idx, pts, /*interpolated=*/true, secondary_stream_idx, secondary_pts, fade_alpha);
+ assert(num_queued_frames > 0);
+ --num_queued_frames;
};
- video_stream->schedule_interpolated_frame(pts, display_func, primary_stream_idx, in_pts_lower, in_pts_upper, alpha, secondary_stream_idx, secondary_pts, fade_alpha);
+ bool ok = video_stream->schedule_interpolated_frame(next_frame_start, pts, display_func, primary_stream_idx, in_pts_lower, in_pts_upper, alpha, secondary_stream_idx, secondary_pts, fade_alpha);
+ unique_lock<mutex> lock(queue_state_mu);
+ if (ok) ++num_queued_frames;
}
}
int override_stream_idx = -1; // Under queue_state_mu.
std::unique_ptr<VideoStream> video_stream; // Can be nullptr.
+
+ // under queue_state_mu. Part of this instead of VideoStream so that we own
+ // its lock and can sleep on it.
+ size_t num_queued_frames = 0;
+ static constexpr size_t max_queued_frames = 10;
};
#endif // !defined(_PLAYER_H)
#include <unistd.h>
using namespace std;
+using namespace std::chrono;
extern HTTPD *global_httpd;
GLuint fade_y_output_tex[num_interpolate_slots], fade_cbcr_output_tex[num_interpolate_slots];
GLuint cb_tex[num_interpolate_slots], cr_tex[num_interpolate_slots];
- glCreateTextures(GL_TEXTURE_2D_ARRAY, 10, input_tex);
- glCreateTextures(GL_TEXTURE_2D_ARRAY, 10, gray_tex);
- glCreateTextures(GL_TEXTURE_2D, 10, fade_y_output_tex);
- glCreateTextures(GL_TEXTURE_2D, 10, fade_cbcr_output_tex);
- glCreateTextures(GL_TEXTURE_2D, 10, cb_tex);
- glCreateTextures(GL_TEXTURE_2D, 10, cr_tex);
+ glCreateTextures(GL_TEXTURE_2D_ARRAY, num_interpolate_slots, input_tex);
+ glCreateTextures(GL_TEXTURE_2D_ARRAY, num_interpolate_slots, gray_tex);
+ glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, fade_y_output_tex);
+ glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, fade_cbcr_output_tex);
+ glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, cb_tex);
+ glCreateTextures(GL_TEXTURE_2D, num_interpolate_slots, cr_tex);
check_error();
constexpr size_t width = 1280, height = 720; // FIXME: adjustable width, height
encode_thread.join();
}
-void VideoStream::schedule_original_frame(int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts)
+void VideoStream::clear_queue()
+{
+ unique_lock<mutex> lock(queue_lock);
+ frame_queue.clear();
+}
+
+void VideoStream::schedule_original_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts)
{
fprintf(stderr, "output_pts=%ld original input_pts=%ld\n", output_pts, input_pts);
QueuedFrame qf;
+ qf.local_pts = local_pts;
qf.type = QueuedFrame::ORIGINAL;
qf.output_pts = output_pts;
qf.stream_idx = stream_idx;
unique_lock<mutex> lock(queue_lock);
frame_queue.push_back(qf);
- queue_nonempty.notify_all();
+ queue_changed.notify_all();
}
-void VideoStream::schedule_faded_frame(int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
+bool VideoStream::schedule_faded_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
{
fprintf(stderr, "output_pts=%ld faded input_pts=%ld,%ld fade_alpha=%.2f\n", output_pts, input_pts, secondary_input_pts, fade_alpha);
unique_lock<mutex> lock(queue_lock);
if (interpolate_resources.empty()) {
fprintf(stderr, "WARNING: Too many interpolated frames already in transit; dropping one.\n");
- return;
+ return false;
}
resources = interpolate_resources.front();
interpolate_resources.pop_front();
ycbcr_semiplanar_converter->prepare_chain_for_fade(frame1, frame2, fade_alpha)->render_to_fbo(resources.fade_fbo, 1280, 720);
QueuedFrame qf;
+ qf.local_pts = local_pts;
qf.type = QueuedFrame::FADED;
qf.output_pts = output_pts;
qf.stream_idx = stream_idx;
unique_lock<mutex> lock(queue_lock);
frame_queue.push_back(qf);
- queue_nonempty.notify_all();
+ queue_changed.notify_all();
+ return true;
}
-void VideoStream::schedule_interpolated_frame(int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
+bool VideoStream::schedule_interpolated_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha)
{
if (secondary_stream_idx != -1) {
fprintf(stderr, "output_pts=%ld interpolated input_pts1=%ld input_pts2=%ld alpha=%.3f secondary_pts=%ld fade_alpha=%.2f\n", output_pts, input_first_pts, input_second_pts, alpha, secondary_input_pts, fade_alpha);
unique_lock<mutex> lock(queue_lock);
if (interpolate_resources.empty()) {
fprintf(stderr, "WARNING: Too many interpolated frames already in transit; dropping one.\n");
- return;
+ return false;
}
resources = interpolate_resources.front();
interpolate_resources.pop_front();
unique_lock<mutex> lock(queue_lock);
frame_queue.push_back(qf);
- queue_nonempty.notify_all();
+ queue_changed.notify_all();
+ return true;
}
-void VideoStream::schedule_refresh_frame(int64_t output_pts, function<void()> &&display_func)
+void VideoStream::schedule_refresh_frame(steady_clock::time_point local_pts, int64_t output_pts, function<void()> &&display_func)
{
QueuedFrame qf;
qf.type = QueuedFrame::REFRESH;
unique_lock<mutex> lock(queue_lock);
frame_queue.push_back(qf);
- queue_nonempty.notify_all();
+ queue_changed.notify_all();
}
namespace {
QueuedFrame qf;
{
unique_lock<mutex> lock(queue_lock);
- queue_nonempty.wait(lock, [this]{
+
+ // Wait until we have a frame to play.
+ queue_changed.wait(lock, [this]{
return !frame_queue.empty();
});
+ steady_clock::time_point frame_start = frame_queue.front().local_pts;
+
+ // Now sleep until the frame is supposed to start (the usual case),
+ // _or_ clear_queue() happened.
+ bool aborted = queue_changed.wait_until(lock, frame_start, [this, frame_start]{
+ return frame_queue.empty() || frame_queue.front().local_pts != frame_start;
+ });
+ if (aborted) {
+ // clear_queue() happened, so don't play this frame after all.
+ continue;
+ }
qf = frame_queue.front();
frame_queue.pop_front();
}
#include "jpeg_frame_view.h"
#include "ref_counted_gl_sync.h"
+#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
~VideoStream();
void start();
void stop();
+ void clear_queue();
// “display_func” is called after the frame has been calculated (if needed)
- // and has gone out to the stream.
- void schedule_original_frame(int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts);
- void schedule_faded_frame(int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha);
- void schedule_interpolated_frame(int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx = -1, int64_t secondary_inputs_pts = -1, float fade_alpha = 0.0f); // -1 = no secondary frame.
- void schedule_refresh_frame(int64_t output_pts, std::function<void()> &&display_func);
+ // and has gone out to the stream. Returns false on failure (ie., couldn't
+ // schedule the frame due to lack of resources).
+ void schedule_original_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts);
+ bool schedule_faded_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_pts, int secondary_stream_idx, int64_t secondary_input_pts, float fade_alpha);
+ bool schedule_interpolated_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func, unsigned stream_idx, int64_t input_first_pts, int64_t input_second_pts, float alpha, int secondary_stream_idx = -1, int64_t secondary_inputs_pts = -1, float fade_alpha = 0.0f); // -1 = no secondary frame.
+ void schedule_refresh_frame(std::chrono::steady_clock::time_point, int64_t output_pts, std::function<void()> &&display_func);
private:
void encode_thread_func();
void *pbo_contents; // Persistently mapped.
};
std::deque<InterpolatedFrameResources> interpolate_resources; // Under <queue_lock>.
- static constexpr size_t num_interpolate_slots = 10;
+ static constexpr size_t num_interpolate_slots = 15; // Should be larger than Player::max_queued_frames, or we risk mass-dropping frames.
struct QueuedFrame {
+ std::chrono::steady_clock::time_point local_pts;
+
int64_t output_pts;
enum Type { ORIGINAL, FADED, INTERPOLATED, FADED_INTERPOLATED, REFRESH } type;
unsigned stream_idx;
};
std::deque<QueuedFrame> frame_queue; // Under <queue_lock>.
std::mutex queue_lock;
- std::condition_variable queue_nonempty;
+ std::condition_variable queue_changed;
std::unique_ptr<Mux> stream_mux; // To HTTP.
std::string stream_mux_header;