]> git.sesse.net Git - casparcg/commitdiff
Added support for delaying frames to layer_producer and channel_producer.
authorHelge Norberg <helge.norberg@svt.se>
Fri, 8 Jan 2016 18:08:53 +0000 (19:08 +0100)
committerHelge Norberg <helge.norberg@svt.se>
Fri, 8 Jan 2016 18:08:53 +0000 (19:08 +0100)
common/semaphore.h
modules/reroute/producer/channel_producer.cpp
modules/reroute/producer/channel_producer.h
modules/reroute/producer/layer_producer.cpp
modules/reroute/producer/layer_producer.h
modules/reroute/producer/reroute_producer.cpp

index a8563ea20078786132deb67d2bbca76d39fc51eb..04e36426045c48c9603807a9c7f0a90d1806c9f8 100644 (file)
@@ -125,6 +125,44 @@ public:
                }
        }
 
+       /**
+        * Acquire a number of permits. Will block until the given number of
+        * permits has been acquired if not enough permits are currently available
+        * or the timeout has passed.
+        *
+        * @param permits The number of permits to acquire.
+        * @param timeout The timeout (will be used for each permit).
+        *
+        * @return whether successfully acquired within timeout or not.
+        */
+       template <typename Rep, typename Period>
+       bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
+       {
+               boost::unique_lock<boost::mutex> lock(mutex_);
+               auto num_acquired = 0u;
+
+               while (true)
+               {
+                       auto num_wanted = permits - num_acquired;
+                       auto to_drain = std::min(num_wanted, permits_);
+
+                       permits_ -= to_drain;
+                       num_acquired += to_drain;
+
+                       if (num_acquired == permits)
+                               break;
+
+                       if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
+                       {
+                               lock.unlock();
+                               release(num_acquired);
+                               return false;
+                       }
+               }
+
+               return true;
+       }
+
        /**
         * Acquire one permits if permits are currently available. Does not block
         * until one is available, but returns immediately if unavailable.
index 335211307cc3083309065ccae5e9f6a84ef6ea55..ca1d802e7584d92ff3b5f92afe8a2530babca512 100644 (file)
@@ -43,8 +43,9 @@
 
 #include <common/except.h>
 #include <common/memory.h>
-#include <common/future.h>
 #include <common/memcpy.h>
+#include <common/semaphore.h>
+#include <common/future.h>
 
 #include <tbb/concurrent_queue.h>
 
@@ -64,19 +65,17 @@ class channel_consumer : public core::frame_consumer
        int                                                                                                     consumer_index_;
        tbb::atomic<bool>                                                                       is_running_;
        tbb::atomic<int64_t>                                                            current_age_;
-       std::promise<void>                                                                      first_frame_promise_;
-       std::future<void>                                                                       first_frame_available_;
-       bool                                                                                            first_frame_reported_;
+       semaphore                                                                                       frames_available_ { 0 };
+       int                                                                                                     frames_delay_;
 
 public:
-       channel_consumer()
+       channel_consumer(int frames_delay)
                : consumer_index_(next_consumer_index())
-               , first_frame_available_(first_frame_promise_.get_future())
-               , first_frame_reported_(false)
+               , frames_delay_(frames_delay)
        {
                is_running_ = true;
                current_age_ = 0;
-               frame_buffer_.set_capacity(3);
+               frame_buffer_.set_capacity(3 + frames_delay);
        }
 
        static int next_consumer_index()
@@ -102,11 +101,8 @@ public:
        {
                bool pushed = frame_buffer_.try_push(frame);
 
-               if (pushed && !first_frame_reported_)
-               {
-                       first_frame_promise_.set_value();
-                       first_frame_reported_ = true;
-               }
+               if (pushed)
+                       frames_available_.release();
 
                return make_ready_future(is_running_.load());
        }
@@ -178,7 +174,7 @@ public:
 
        void block_until_first_frame_available()
        {
-               if (first_frame_available_.wait_for(std::chrono::seconds(2)) == std::future_status::timeout)
+               if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2)))
                        CASPAR_LOG(warning)
                                        << print() << L" Timed out while waiting for first frame";
        }
@@ -215,9 +211,10 @@ class channel_producer : public core::frame_producer_base
        uint64_t                                                                        frame_number_;
 
 public:
-       explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr<core::video_channel>& channel
+       explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr<core::video_channel>& channel, int frames_delay)
                : frame_factory_(dependecies.frame_factory)
                , output_format_desc_(dependecies.format_desc)
+               , consumer_(spl::make_shared<channel_consumer>(frames_delay))
                , frame_number_(0)
        {
                pixel_constraints_.width.set(output_format_desc_.width);
@@ -311,9 +308,10 @@ public:
 
 spl::shared_ptr<core::frame_producer> create_channel_producer(
                const core::frame_producer_dependencies& dependencies,
-               const spl::shared_ptr<core::video_channel>& channel)
+               const spl::shared_ptr<core::video_channel>& channel,
+               int frames_delay)
 {
-       return spl::make_shared<channel_producer>(dependencies, channel);
+       return spl::make_shared<channel_producer>(dependencies, channel, frames_delay);
 }
 
-}}
\ No newline at end of file
+}}
index e8faf312c5907d75540ee5a7b3a5bfc4b1782a58..c4ddfc4d0739bc1c79117c869c132934bddb30a1 100644 (file)
@@ -29,6 +29,7 @@ namespace caspar { namespace reroute {
 
 spl::shared_ptr<core::frame_producer> create_channel_producer(
                const core::frame_producer_dependencies& dependencies,
-               const spl::shared_ptr<core::video_channel>& channel);
+               const spl::shared_ptr<core::video_channel>& channel,
+               int frames_delay);
 
 }}
index 81bf198f12edfb1738c3981b06b7b0b0fb99acf5..53a9b40c32a0b237da6853b9e2a4543aacae3581 100644 (file)
@@ -32,7 +32,7 @@
 #include <core/producer/stage.h>
 
 #include <common/except.h>
-#include <common/future.h>
+#include <common/semaphore.h>
 
 #include <boost/format.hpp>
 
@@ -43,16 +43,14 @@ namespace caspar { namespace reroute {
 class layer_consumer : public core::write_frame_consumer
 {      
        tbb::concurrent_bounded_queue<core::draw_frame> frame_buffer_;
-       std::promise<void>                                                              first_frame_promise_;
-       std::future<void>                                                               first_frame_available_;
-       bool                                                                                    first_frame_reported_;
+       semaphore                                                                               frames_available_ { 0 };
+       int                                                                                             frames_delay_;
 
 public:
-       layer_consumer()
-               : first_frame_available_(first_frame_promise_.get_future())
-               , first_frame_reported_(false)
+       layer_consumer(int frames_delay)
+               : frames_delay_(frames_delay)
        {
-               frame_buffer_.set_capacity(2);
+               frame_buffer_.set_capacity(2 + frames_delay);
        }
 
        ~layer_consumer()
@@ -65,11 +63,8 @@ public:
        {
                bool pushed = frame_buffer_.try_push(src_frame);
 
-               if (pushed && !first_frame_reported_)
-               {
-                       first_frame_promise_.set_value();
-                       first_frame_reported_ = true;
-               }
+               if (pushed)
+                       frames_available_.release();
        }
 
        std::wstring print() const override
@@ -89,7 +84,7 @@ public:
 
        void block_until_first_frame_available()
        {
-               if (first_frame_available_.wait_for(std::chrono::seconds(2)) == std::future_status::timeout)
+               if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2)))
                        CASPAR_LOG(warning)
                                        << print() << L" Timed out while waiting for first frame";
        }
@@ -109,8 +104,9 @@ class layer_producer : public core::frame_producer_base
        core::constraints                                                       pixel_constraints_;
 
 public:
-       explicit layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer
+       explicit layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer, int frames_delay)
                : layer_(layer)
+               , consumer_(spl::make_shared<layer_consumer>(frames_delay))
                , channel_(channel)
                , last_frame_(core::draw_frame::empty())
                , frame_number_(0)
@@ -168,9 +164,9 @@ public:
        }
 };
 
-spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer)
+spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer, int frames_delay)
 {
-       return spl::make_shared<layer_producer>(channel, layer);
+       return spl::make_shared<layer_producer>(channel, layer, frames_delay);
 }
 
-}}
\ No newline at end of file
+}}
index 7f47974027a7d369001c3017016a1a33651cb9d2..463d78937dcd09f186eaaf161b060780c7f444eb 100644 (file)
@@ -27,6 +27,6 @@
 
 namespace caspar { namespace reroute {
 
-spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer);
+spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer, int frames_delay);
 
 }}
index 54614d428acdd437fe90080dd590be6a37c385a5..e34a43018423a6d36c767c064312f4646d03d5fe 100644 (file)
@@ -25,6 +25,8 @@
 #include "layer_producer.h"
 #include "channel_producer.h"
 
+#include <common/param.h>
+
 #include <core/producer/frame_producer.h>
 #include <core/video_channel.h>
 #include <core/help/help_sink.h>
@@ -38,11 +40,14 @@ namespace caspar { namespace reroute {
 void describe_producer(core::help_sink& sink, const core::help_repository& repository)
 {
        sink.short_description(L"Reroutes a complete channel or a layer to another layer.");
-       sink.syntax(L"route://[source_channel:int]{-[source_layer:int]}");
-       sink.para()->text(L"Reroutes the composited video of a channel or the untransformed video of a layer .");
+       sink.syntax(L"route://[source_channel:int]{-[source_layer:int]} {FRAMES_DELAY [frames_delay:int]}");
+       sink.para()->text(L"Reroutes the composited video of a channel or the untransformed video of a layer.");
        sink.para()
                ->text(L"If ")->code(L"source_layer")->text(L" is specified, only the video of the source layer is rerouted. ")
                ->text(L"If on the other hand only ")->code(L"source_channel")->text(L" is specified, the video of the complete channel is rerouted.");
+       sink.para()
+               ->text(L"An optional additional delay can be specified with the ")->code(L"frames_delay")
+               ->text(L" parameter.");
        sink.para()->text(L"Examples:");
        sink.example(L">> PLAY 1-10 route://1-11", L"Play the contents of layer 1-11 on layer 1-10 as well.");
        sink.example(L">> PLAY 1-10 route://2", L"Play the composited contents of channel 2 on layer 1-10 as well.");
@@ -50,7 +55,10 @@ void describe_producer(core::help_sink& sink, const core::help_repository& repos
                L">> MIXER 1-10 FILL 0.02 0.01 0.9 0.9\n"
                L">> PLAY 1-10 route://1\n"
                L">> PLAY 1-9 AMB LOOP", L"Play the composited contents of channel 1 on layer 1-10. Since the source and destination channel is the same, an \"infinity\" effect is created.");
-       sink.para()->text(L"Always expect a few frames delay on the routed-to layer.");
+       sink.example(L">> PLAY 1-10 route://1-11 FRAMES_DELAY 10", L"Play the contents of layer 1-11 on layer 1-10 as well with an added 10 frames delay.");
+       sink.para()
+               ->text(L"Always expect a few frames delay on the routed-to layer in addition to the optionally specified ")
+               ->code(L"frames_delay")->text(L" parameter.");
 }
 
 spl::shared_ptr<core::frame_producer> create_producer(
@@ -83,16 +91,21 @@ spl::shared_ptr<core::frame_producer> create_producer(
        if (found_channel == dependencies.channels.end())
                CASPAR_THROW_EXCEPTION(user_error() << msg_info(L"No channel with id " + boost::lexical_cast<std::wstring>(channel_id)));
 
+       auto params2 = params;
+       params2.erase(params2.begin());
+
+       auto frames_delay = get_param(L"FRAMES_DELAY", params2, 0);
+
        if (has_layer_spec)
        {
                auto layer = boost::lexical_cast<int>(channel_layer_spec.substr(dash + 1));
 
-               return create_layer_producer(*found_channel, layer);
+               return create_layer_producer(*found_channel, layer, frames_delay);
        }
        else
        {
-               return create_channel_producer(dependencies, *found_channel);
+               return create_channel_producer(dependencies, *found_channel, frames_delay);
        }
 }
 
-}}
\ No newline at end of file
+}}