From 6190c52329a90fbf8f8635b87ccca927ff7b52d3 Mon Sep 17 00:00:00 2001 From: Helge Norberg Date: Fri, 8 Jan 2016 19:08:53 +0100 Subject: [PATCH] Added support for delaying frames to layer_producer and channel_producer. --- common/semaphore.h | 38 +++++++++++++++++++ modules/reroute/producer/channel_producer.cpp | 34 ++++++++--------- modules/reroute/producer/channel_producer.h | 3 +- modules/reroute/producer/layer_producer.cpp | 32 +++++++--------- modules/reroute/producer/layer_producer.h | 2 +- modules/reroute/producer/reroute_producer.cpp | 25 +++++++++--- 6 files changed, 90 insertions(+), 44 deletions(-) diff --git a/common/semaphore.h b/common/semaphore.h index a8563ea20..04e364260 100644 --- a/common/semaphore.h +++ b/common/semaphore.h @@ -125,6 +125,44 @@ public: } } + /** + * Acquire a number of permits. Will block until the given number of + * permits has been acquired if not enough permits are currently available + * or the timeout has passed. + * + * @param permits The number of permits to acquire. + * @param timeout The timeout (will be used for each permit). + * + * @return whether successfully acquired within timeout or not. + */ + template + bool try_acquire(unsigned int permits, const boost::chrono::duration& timeout) + { + boost::unique_lock lock(mutex_); + auto num_acquired = 0u; + + while (true) + { + auto num_wanted = permits - num_acquired; + auto to_drain = std::min(num_wanted, permits_); + + permits_ -= to_drain; + num_acquired += to_drain; + + if (num_acquired == permits) + break; + + if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout) + { + lock.unlock(); + release(num_acquired); + return false; + } + } + + return true; + } + /** * Acquire one permits if permits are currently available. Does not block * until one is available, but returns immediately if unavailable. diff --git a/modules/reroute/producer/channel_producer.cpp b/modules/reroute/producer/channel_producer.cpp index 335211307..ca1d802e7 100644 --- a/modules/reroute/producer/channel_producer.cpp +++ b/modules/reroute/producer/channel_producer.cpp @@ -43,8 +43,9 @@ #include #include -#include #include +#include +#include #include @@ -64,19 +65,17 @@ class channel_consumer : public core::frame_consumer int consumer_index_; tbb::atomic is_running_; tbb::atomic current_age_; - std::promise first_frame_promise_; - std::future first_frame_available_; - bool first_frame_reported_; + semaphore frames_available_ { 0 }; + int frames_delay_; public: - channel_consumer() + channel_consumer(int frames_delay) : consumer_index_(next_consumer_index()) - , first_frame_available_(first_frame_promise_.get_future()) - , first_frame_reported_(false) + , frames_delay_(frames_delay) { is_running_ = true; current_age_ = 0; - frame_buffer_.set_capacity(3); + frame_buffer_.set_capacity(3 + frames_delay); } static int next_consumer_index() @@ -102,11 +101,8 @@ public: { bool pushed = frame_buffer_.try_push(frame); - if (pushed && !first_frame_reported_) - { - first_frame_promise_.set_value(); - first_frame_reported_ = true; - } + if (pushed) + frames_available_.release(); return make_ready_future(is_running_.load()); } @@ -178,7 +174,7 @@ public: void block_until_first_frame_available() { - if (first_frame_available_.wait_for(std::chrono::seconds(2)) == std::future_status::timeout) + if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2))) CASPAR_LOG(warning) << print() << L" Timed out while waiting for first frame"; } @@ -215,9 +211,10 @@ class channel_producer : public core::frame_producer_base uint64_t frame_number_; public: - explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr& channel) + explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr& channel, int frames_delay) : frame_factory_(dependecies.frame_factory) , output_format_desc_(dependecies.format_desc) + , consumer_(spl::make_shared(frames_delay)) , frame_number_(0) { pixel_constraints_.width.set(output_format_desc_.width); @@ -311,9 +308,10 @@ public: spl::shared_ptr create_channel_producer( const core::frame_producer_dependencies& dependencies, - const spl::shared_ptr& channel) + const spl::shared_ptr& channel, + int frames_delay) { - return spl::make_shared(dependencies, channel); + return spl::make_shared(dependencies, channel, frames_delay); } -}} \ No newline at end of file +}} diff --git a/modules/reroute/producer/channel_producer.h b/modules/reroute/producer/channel_producer.h index e8faf312c..c4ddfc4d0 100644 --- a/modules/reroute/producer/channel_producer.h +++ b/modules/reroute/producer/channel_producer.h @@ -29,6 +29,7 @@ namespace caspar { namespace reroute { spl::shared_ptr create_channel_producer( const core::frame_producer_dependencies& dependencies, - const spl::shared_ptr& channel); + const spl::shared_ptr& channel, + int frames_delay); }} diff --git a/modules/reroute/producer/layer_producer.cpp b/modules/reroute/producer/layer_producer.cpp index 81bf198f1..53a9b40c3 100644 --- a/modules/reroute/producer/layer_producer.cpp +++ b/modules/reroute/producer/layer_producer.cpp @@ -32,7 +32,7 @@ #include #include -#include +#include #include @@ -43,16 +43,14 @@ namespace caspar { namespace reroute { class layer_consumer : public core::write_frame_consumer { tbb::concurrent_bounded_queue frame_buffer_; - std::promise first_frame_promise_; - std::future first_frame_available_; - bool first_frame_reported_; + semaphore frames_available_ { 0 }; + int frames_delay_; public: - layer_consumer() - : first_frame_available_(first_frame_promise_.get_future()) - , first_frame_reported_(false) + layer_consumer(int frames_delay) + : frames_delay_(frames_delay) { - frame_buffer_.set_capacity(2); + frame_buffer_.set_capacity(2 + frames_delay); } ~layer_consumer() @@ -65,11 +63,8 @@ public: { bool pushed = frame_buffer_.try_push(src_frame); - if (pushed && !first_frame_reported_) - { - first_frame_promise_.set_value(); - first_frame_reported_ = true; - } + if (pushed) + frames_available_.release(); } std::wstring print() const override @@ -89,7 +84,7 @@ public: void block_until_first_frame_available() { - if (first_frame_available_.wait_for(std::chrono::seconds(2)) == std::future_status::timeout) + if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2))) CASPAR_LOG(warning) << print() << L" Timed out while waiting for first frame"; } @@ -109,8 +104,9 @@ class layer_producer : public core::frame_producer_base core::constraints pixel_constraints_; public: - explicit layer_producer(const spl::shared_ptr& channel, int layer) + explicit layer_producer(const spl::shared_ptr& channel, int layer, int frames_delay) : layer_(layer) + , consumer_(spl::make_shared(frames_delay)) , channel_(channel) , last_frame_(core::draw_frame::empty()) , frame_number_(0) @@ -168,9 +164,9 @@ public: } }; -spl::shared_ptr create_layer_producer(const spl::shared_ptr& channel, int layer) +spl::shared_ptr create_layer_producer(const spl::shared_ptr& channel, int layer, int frames_delay) { - return spl::make_shared(channel, layer); + return spl::make_shared(channel, layer, frames_delay); } -}} \ No newline at end of file +}} diff --git a/modules/reroute/producer/layer_producer.h b/modules/reroute/producer/layer_producer.h index 7f4797402..463d78937 100644 --- a/modules/reroute/producer/layer_producer.h +++ b/modules/reroute/producer/layer_producer.h @@ -27,6 +27,6 @@ namespace caspar { namespace reroute { -spl::shared_ptr create_layer_producer(const spl::shared_ptr& channel, int layer); +spl::shared_ptr create_layer_producer(const spl::shared_ptr& channel, int layer, int frames_delay); }} diff --git a/modules/reroute/producer/reroute_producer.cpp b/modules/reroute/producer/reroute_producer.cpp index 54614d428..e34a43018 100644 --- a/modules/reroute/producer/reroute_producer.cpp +++ b/modules/reroute/producer/reroute_producer.cpp @@ -25,6 +25,8 @@ #include "layer_producer.h" #include "channel_producer.h" +#include + #include #include #include @@ -38,11 +40,14 @@ namespace caspar { namespace reroute { void describe_producer(core::help_sink& sink, const core::help_repository& repository) { sink.short_description(L"Reroutes a complete channel or a layer to another layer."); - sink.syntax(L"route://[source_channel:int]{-[source_layer:int]}"); - sink.para()->text(L"Reroutes the composited video of a channel or the untransformed video of a layer ."); + sink.syntax(L"route://[source_channel:int]{-[source_layer:int]} {FRAMES_DELAY [frames_delay:int]}"); + sink.para()->text(L"Reroutes the composited video of a channel or the untransformed video of a layer."); sink.para() ->text(L"If ")->code(L"source_layer")->text(L" is specified, only the video of the source layer is rerouted. ") ->text(L"If on the other hand only ")->code(L"source_channel")->text(L" is specified, the video of the complete channel is rerouted."); + sink.para() + ->text(L"An optional additional delay can be specified with the ")->code(L"frames_delay") + ->text(L" parameter."); sink.para()->text(L"Examples:"); sink.example(L">> PLAY 1-10 route://1-11", L"Play the contents of layer 1-11 on layer 1-10 as well."); sink.example(L">> PLAY 1-10 route://2", L"Play the composited contents of channel 2 on layer 1-10 as well."); @@ -50,7 +55,10 @@ void describe_producer(core::help_sink& sink, const core::help_repository& repos L">> MIXER 1-10 FILL 0.02 0.01 0.9 0.9\n" L">> PLAY 1-10 route://1\n" L">> PLAY 1-9 AMB LOOP", L"Play the composited contents of channel 1 on layer 1-10. Since the source and destination channel is the same, an \"infinity\" effect is created."); - sink.para()->text(L"Always expect a few frames delay on the routed-to layer."); + sink.example(L">> PLAY 1-10 route://1-11 FRAMES_DELAY 10", L"Play the contents of layer 1-11 on layer 1-10 as well with an added 10 frames delay."); + sink.para() + ->text(L"Always expect a few frames delay on the routed-to layer in addition to the optionally specified ") + ->code(L"frames_delay")->text(L" parameter."); } spl::shared_ptr create_producer( @@ -83,16 +91,21 @@ spl::shared_ptr create_producer( if (found_channel == dependencies.channels.end()) CASPAR_THROW_EXCEPTION(user_error() << msg_info(L"No channel with id " + boost::lexical_cast(channel_id))); + auto params2 = params; + params2.erase(params2.begin()); + + auto frames_delay = get_param(L"FRAMES_DELAY", params2, 0); + if (has_layer_spec) { auto layer = boost::lexical_cast(channel_layer_spec.substr(dash + 1)); - return create_layer_producer(*found_channel, layer); + return create_layer_producer(*found_channel, layer, frames_delay); } else { - return create_channel_producer(dependencies, *found_channel); + return create_channel_producer(dependencies, *found_channel, frames_delay); } } -}} \ No newline at end of file +}} -- 2.39.2