}
}
+ /**
+ * Acquire a number of permits. Will block until the given number of
+ * permits has been acquired if not enough permits are currently available
+ * or the timeout has passed.
+ *
+ * @param permits The number of permits to acquire.
+ * @param timeout The timeout (will be used for each permit).
+ *
+ * @return whether successfully acquired within timeout or not.
+ */
+ template <typename Rep, typename Period>
+ bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+ auto num_acquired = 0u;
+
+ while (true)
+ {
+ auto num_wanted = permits - num_acquired;
+ auto to_drain = std::min(num_wanted, permits_);
+
+ permits_ -= to_drain;
+ num_acquired += to_drain;
+
+ if (num_acquired == permits)
+ break;
+
+ if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
+ {
+ lock.unlock();
+ release(num_acquired);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
/**
* Acquire one permits if permits are currently available. Does not block
* until one is available, but returns immediately if unavailable.
#include <common/except.h>
#include <common/memory.h>
-#include <common/future.h>
#include <common/memcpy.h>
+#include <common/semaphore.h>
+#include <common/future.h>
#include <tbb/concurrent_queue.h>
int consumer_index_;
tbb::atomic<bool> is_running_;
tbb::atomic<int64_t> current_age_;
- std::promise<void> first_frame_promise_;
- std::future<void> first_frame_available_;
- bool first_frame_reported_;
+ semaphore frames_available_ { 0 };
+ int frames_delay_;
public:
- channel_consumer()
+ channel_consumer(int frames_delay)
: consumer_index_(next_consumer_index())
- , first_frame_available_(first_frame_promise_.get_future())
- , first_frame_reported_(false)
+ , frames_delay_(frames_delay)
{
is_running_ = true;
current_age_ = 0;
- frame_buffer_.set_capacity(3);
+ frame_buffer_.set_capacity(3 + frames_delay);
}
static int next_consumer_index()
{
bool pushed = frame_buffer_.try_push(frame);
- if (pushed && !first_frame_reported_)
- {
- first_frame_promise_.set_value();
- first_frame_reported_ = true;
- }
+ if (pushed)
+ frames_available_.release();
return make_ready_future(is_running_.load());
}
void block_until_first_frame_available()
{
- if (first_frame_available_.wait_for(std::chrono::seconds(2)) == std::future_status::timeout)
+ if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2)))
CASPAR_LOG(warning)
<< print() << L" Timed out while waiting for first frame";
}
uint64_t frame_number_;
public:
- explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr<core::video_channel>& channel)
+ explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr<core::video_channel>& channel, int frames_delay)
: frame_factory_(dependecies.frame_factory)
, output_format_desc_(dependecies.format_desc)
+ , consumer_(spl::make_shared<channel_consumer>(frames_delay))
, frame_number_(0)
{
pixel_constraints_.width.set(output_format_desc_.width);
spl::shared_ptr<core::frame_producer> create_channel_producer(
const core::frame_producer_dependencies& dependencies,
- const spl::shared_ptr<core::video_channel>& channel)
+ const spl::shared_ptr<core::video_channel>& channel,
+ int frames_delay)
{
- return spl::make_shared<channel_producer>(dependencies, channel);
+ return spl::make_shared<channel_producer>(dependencies, channel, frames_delay);
}
-}}
\ No newline at end of file
+}}
spl::shared_ptr<core::frame_producer> create_channel_producer(
const core::frame_producer_dependencies& dependencies,
- const spl::shared_ptr<core::video_channel>& channel);
+ const spl::shared_ptr<core::video_channel>& channel,
+ int frames_delay);
}}
#include <core/producer/stage.h>
#include <common/except.h>
-#include <common/future.h>
+#include <common/semaphore.h>
#include <boost/format.hpp>
class layer_consumer : public core::write_frame_consumer
{
tbb::concurrent_bounded_queue<core::draw_frame> frame_buffer_;
- std::promise<void> first_frame_promise_;
- std::future<void> first_frame_available_;
- bool first_frame_reported_;
+ semaphore frames_available_ { 0 };
+ int frames_delay_;
public:
- layer_consumer()
- : first_frame_available_(first_frame_promise_.get_future())
- , first_frame_reported_(false)
+ layer_consumer(int frames_delay)
+ : frames_delay_(frames_delay)
{
- frame_buffer_.set_capacity(2);
+ frame_buffer_.set_capacity(2 + frames_delay);
}
~layer_consumer()
{
bool pushed = frame_buffer_.try_push(src_frame);
- if (pushed && !first_frame_reported_)
- {
- first_frame_promise_.set_value();
- first_frame_reported_ = true;
- }
+ if (pushed)
+ frames_available_.release();
}
std::wstring print() const override
void block_until_first_frame_available()
{
- if (first_frame_available_.wait_for(std::chrono::seconds(2)) == std::future_status::timeout)
+ if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2)))
CASPAR_LOG(warning)
<< print() << L" Timed out while waiting for first frame";
}
core::constraints pixel_constraints_;
public:
- explicit layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer)
+ explicit layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer, int frames_delay)
: layer_(layer)
+ , consumer_(spl::make_shared<layer_consumer>(frames_delay))
, channel_(channel)
, last_frame_(core::draw_frame::empty())
, frame_number_(0)
}
};
-spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer)
+spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer, int frames_delay)
{
- return spl::make_shared<layer_producer>(channel, layer);
+ return spl::make_shared<layer_producer>(channel, layer, frames_delay);
}
-}}
\ No newline at end of file
+}}
namespace caspar { namespace reroute {
-spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer);
+spl::shared_ptr<core::frame_producer> create_layer_producer(const spl::shared_ptr<core::video_channel>& channel, int layer, int frames_delay);
}}
#include "layer_producer.h"
#include "channel_producer.h"
+#include <common/param.h>
+
#include <core/producer/frame_producer.h>
#include <core/video_channel.h>
#include <core/help/help_sink.h>
void describe_producer(core::help_sink& sink, const core::help_repository& repository)
{
sink.short_description(L"Reroutes a complete channel or a layer to another layer.");
- sink.syntax(L"route://[source_channel:int]{-[source_layer:int]}");
- sink.para()->text(L"Reroutes the composited video of a channel or the untransformed video of a layer .");
+ sink.syntax(L"route://[source_channel:int]{-[source_layer:int]} {FRAMES_DELAY [frames_delay:int]}");
+ sink.para()->text(L"Reroutes the composited video of a channel or the untransformed video of a layer.");
sink.para()
->text(L"If ")->code(L"source_layer")->text(L" is specified, only the video of the source layer is rerouted. ")
->text(L"If on the other hand only ")->code(L"source_channel")->text(L" is specified, the video of the complete channel is rerouted.");
+ sink.para()
+ ->text(L"An optional additional delay can be specified with the ")->code(L"frames_delay")
+ ->text(L" parameter.");
sink.para()->text(L"Examples:");
sink.example(L">> PLAY 1-10 route://1-11", L"Play the contents of layer 1-11 on layer 1-10 as well.");
sink.example(L">> PLAY 1-10 route://2", L"Play the composited contents of channel 2 on layer 1-10 as well.");
L">> MIXER 1-10 FILL 0.02 0.01 0.9 0.9\n"
L">> PLAY 1-10 route://1\n"
L">> PLAY 1-9 AMB LOOP", L"Play the composited contents of channel 1 on layer 1-10. Since the source and destination channel is the same, an \"infinity\" effect is created.");
- sink.para()->text(L"Always expect a few frames delay on the routed-to layer.");
+ sink.example(L">> PLAY 1-10 route://1-11 FRAMES_DELAY 10", L"Play the contents of layer 1-11 on layer 1-10 as well with an added 10 frames delay.");
+ sink.para()
+ ->text(L"Always expect a few frames delay on the routed-to layer in addition to the optionally specified ")
+ ->code(L"frames_delay")->text(L" parameter.");
}
spl::shared_ptr<core::frame_producer> create_producer(
if (found_channel == dependencies.channels.end())
CASPAR_THROW_EXCEPTION(user_error() << msg_info(L"No channel with id " + boost::lexical_cast<std::wstring>(channel_id)));
+ auto params2 = params;
+ params2.erase(params2.begin());
+
+ auto frames_delay = get_param(L"FRAMES_DELAY", params2, 0);
+
if (has_layer_spec)
{
auto layer = boost::lexical_cast<int>(channel_layer_spec.substr(dash + 1));
- return create_layer_producer(*found_channel, layer);
+ return create_layer_producer(*found_channel, layer, frames_delay);
}
else
{
- return create_channel_producer(dependencies, *found_channel);
+ return create_channel_producer(dependencies, *found_channel, frames_delay);
}
}
-}}
\ No newline at end of file
+}}