From 9e4b08cde6c6de9e83a3fff42d90affc3cd8e5bc Mon Sep 17 00:00:00 2001 From: Helge Norberg Date: Tue, 25 Oct 2016 16:44:21 +0200 Subject: [PATCH] Created a consumer that provides sync to a channel based on the pace of another channel. I called it syncto_consumer. It solves problems where the rendering pace of a routed from channel (without a decklink consumer) differs from the routed to channel. --- common/semaphore.h | 84 +++++++- core/CMakeLists.txt | 5 + core/consumer/frame_consumer.cpp | 67 ++++--- core/consumer/frame_consumer.h | 19 +- core/consumer/output.cpp | 44 ++-- core/consumer/output.h | 11 +- core/consumer/port.cpp | 16 +- core/consumer/port.h | 5 +- core/consumer/syncto/syncto_consumer.cpp | 188 ++++++++++++++++++ core/consumer/syncto/syncto_consumer.h | 30 +++ core/video_channel.cpp | 43 +++- core/video_channel.h | 10 +- .../bluefish/consumer/bluefish_consumer.cpp | 6 +- modules/bluefish/consumer/bluefish_consumer.h | 8 +- .../decklink/consumer/decklink_consumer.cpp | 4 +- modules/decklink/consumer/decklink_consumer.h | 8 +- modules/ffmpeg/consumer/ffmpeg_consumer.cpp | 4 +- modules/ffmpeg/consumer/ffmpeg_consumer.h | 8 +- .../ffmpeg/consumer/streaming_consumer.cpp | 4 +- modules/ffmpeg/consumer/streaming_consumer.h | 6 +- modules/image/consumer/image_consumer.cpp | 7 +- modules/image/consumer/image_consumer.h | 7 +- .../newtek/consumer/newtek_ivga_consumer.cpp | 18 +- .../newtek/consumer/newtek_ivga_consumer.h | 6 +- modules/oal/consumer/oal_consumer.cpp | 48 ++--- modules/oal/consumer/oal_consumer.h | 8 +- modules/screen/consumer/screen_consumer.cpp | 6 +- modules/screen/consumer/screen_consumer.h | 8 +- protocol/amcp/AMCPCommandsImpl.cpp | 38 ++-- shell/casparcg.config | 3 + shell/server.cpp | 21 +- 31 files changed, 567 insertions(+), 173 deletions(-) create mode 100644 core/consumer/syncto/syncto_consumer.cpp create mode 100644 core/consumer/syncto/syncto_consumer.h diff --git a/common/semaphore.h b/common/semaphore.h index 04e364260..7baccda0e 100644 --- a/common/semaphore.h +++ b/common/semaphore.h @@ -27,6 +27,10 @@ #include #include +#include +#include +#include + namespace caspar { template @@ -43,9 +47,10 @@ void repeat_n(N times_to_repeat_block, const Func& func) */ class semaphore : boost::noncopyable { - mutable boost::mutex mutex_; - unsigned int permits_; - boost::condition_variable_any permits_available_; + mutable boost::mutex mutex_; + unsigned int permits_; + boost::condition_variable_any permits_available_; + std::map>> callbacks_per_requested_permits_; public: /** * Constructor. @@ -66,6 +71,7 @@ public: ++permits_; + perform_callback_based_acquire(); permits_available_.notify_one(); } @@ -80,6 +86,7 @@ public: permits_ += permits; + perform_callback_based_acquire(); repeat_n(permits, [this] { permits_available_.notify_one(); }); } @@ -112,11 +119,11 @@ public: while (true) { - auto num_wanted = permits - num_acquired; - auto to_drain = std::min(num_wanted, permits_); + auto num_wanted = permits - num_acquired; + auto to_drain = std::min(num_wanted, permits_); - permits_ -= to_drain; - num_acquired += to_drain; + permits_ -= to_drain; + num_acquired += to_drain; if (num_acquired == permits) break; @@ -125,6 +132,20 @@ public: } } + /** + * Acquire a number of permits. Will not block, but instead invoke a callback + * when the specified number of permits are available and has been acquired. + * + * @param permits The number of permits to acquire. + * @param acquired_callback The callback to invoke when acquired. + */ + void acquire(unsigned int permits, std::function acquired_callback) + { + boost::unique_lock lock(mutex_); + + callbacks_per_requested_permits_[permits].push(std::move(acquired_callback)); + } + /** * Acquire a number of permits. Will block until the given number of * permits has been acquired if not enough permits are currently available @@ -143,11 +164,11 @@ public: while (true) { - auto num_wanted = permits - num_acquired; - auto to_drain = std::min(num_wanted, permits_); + auto num_wanted = permits - num_acquired; + auto to_drain = std::min(num_wanted, permits_); - permits_ -= to_drain; - num_acquired += to_drain; + permits_ -= to_drain; + num_acquired += to_drain; if (num_acquired == permits) break; @@ -194,6 +215,47 @@ public: return permits_; } + +private: + void perform_callback_based_acquire() + { + if (callbacks_per_requested_permits_.empty()) + return; + + while ( + !callbacks_per_requested_permits_.empty() && + callbacks_per_requested_permits_.begin()->first <= permits_) + { + auto requested_permits_and_callbacks = callbacks_per_requested_permits_.begin(); + auto requested_permits = requested_permits_and_callbacks->first; + auto& callbacks = requested_permits_and_callbacks->second; + + if (callbacks.empty()) + { + callbacks_per_requested_permits_.erase(requested_permits_and_callbacks); + continue; + } + + auto& callback = callbacks.front(); + + permits_ -= requested_permits; + mutex_.unlock(); + + try + { + callback(); + } + catch (...) + { + } + + mutex_.lock(); + callbacks.pop(); + + if (callbacks.empty()) + callbacks_per_requested_permits_.erase(requested_permits_and_callbacks); + } + } }; /** diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index aaa67b0e1..21ef8a9bd 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -2,6 +2,8 @@ cmake_minimum_required (VERSION 2.6) project (core) set(SOURCES + consumer/syncto/syncto_consumer.cpp + consumer/frame_consumer.cpp consumer/output.cpp consumer/port.cpp @@ -58,6 +60,8 @@ set(SOURCES video_format.cpp ) set(HEADERS + consumer/syncto/syncto_consumer.h + consumer/frame_consumer.h consumer/output.h consumer/port.h @@ -149,6 +153,7 @@ include_directories(${GLEW_INCLUDE_PATH}) source_group(sources ./*) source_group(sources\\consumer consumer/*) +source_group(sources\\consumer\\syncto consumer/syncto/*) source_group(sources\\diagnostics diagnostics/*) source_group(sources\\producer producer/*) source_group(sources\\producer\\framerate producer/framerate/*) diff --git a/core/consumer/frame_consumer.cpp b/core/consumer/frame_consumer.cpp index 10fccad3e..477998ace 100644 --- a/core/consumer/frame_consumer.cpp +++ b/core/consumer/frame_consumer.cpp @@ -82,27 +82,27 @@ void destroy_consumers_synchronously() } class destroy_consumer_proxy : public frame_consumer -{ +{ std::shared_ptr consumer_; public: - destroy_consumer_proxy(spl::shared_ptr&& consumer) + destroy_consumer_proxy(spl::shared_ptr&& consumer) : consumer_(std::move(consumer)) { destroy_consumers_in_separate_thread() = true; } ~destroy_consumer_proxy() - { + { static tbb::atomic counter; static std::once_flag counter_init_once; std::call_once(counter_init_once, []{ counter = 0; }); if (!destroy_consumers_in_separate_thread()) return; - + ++counter; CASPAR_VERIFY(counter < 8); - + auto consumer = new std::shared_ptr(std::move(consumer_)); boost::thread([=] { @@ -122,38 +122,39 @@ public: pointer_guard.reset(); - }).detach(); + }).detach(); } - + std::future send(const_frame frame) override {return consumer_->send(std::move(frame));} void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override {return consumer_->initialize(format_desc, channel_layout, channel_index);} - std::wstring print() const override {return consumer_->print();} + std::wstring print() const override {return consumer_->print();} std::wstring name() const override {return consumer_->name();} boost::property_tree::wptree info() const override {return consumer_->info();} bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();} int buffer_depth() const override {return consumer_->buffer_depth();} int index() const override {return consumer_->index();} int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();} - monitor::subject& monitor_output() override {return consumer_->monitor_output();} + monitor::subject& monitor_output() override {return consumer_->monitor_output();} + const frame_consumer* unwrapped() const override {return consumer_->unwrapped();} }; class print_consumer_proxy : public frame_consumer -{ +{ std::shared_ptr consumer_; public: - print_consumer_proxy(spl::shared_ptr&& consumer) + print_consumer_proxy(spl::shared_ptr&& consumer) : consumer_(std::move(consumer)) { } ~print_consumer_proxy() - { + { auto str = consumer_->print(); CASPAR_LOG(debug) << str << L" Uninitializing."; consumer_.reset(); CASPAR_LOG(info) << str << L" Uninitialized."; } - + std::future send(const_frame frame) override {return consumer_->send(std::move(frame));} void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override { @@ -167,22 +168,23 @@ public: int buffer_depth() const override {return consumer_->buffer_depth();} int index() const override {return consumer_->index();} int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();} - monitor::subject& monitor_output() override {return consumer_->monitor_output();} + monitor::subject& monitor_output() override {return consumer_->monitor_output();} + const frame_consumer* unwrapped() const override {return consumer_->unwrapped();} }; class recover_consumer_proxy : public frame_consumer -{ +{ std::shared_ptr consumer_; int channel_index_ = -1; video_format_desc format_desc_; audio_channel_layout channel_layout_ = audio_channel_layout::invalid(); public: - recover_consumer_proxy(spl::shared_ptr&& consumer) + recover_consumer_proxy(spl::shared_ptr&& consumer) : consumer_(std::move(consumer)) { } - - std::future send(const_frame frame) override + + std::future send(const_frame frame) override { try { @@ -220,7 +222,8 @@ public: int buffer_depth() const override {return consumer_->buffer_depth();} int index() const override {return consumer_->index();} int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();} - monitor::subject& monitor_output() override {return consumer_->monitor_output();} + monitor::subject& monitor_output() override {return consumer_->monitor_output();} + const frame_consumer* unwrapped() const override {return consumer_->unwrapped();} }; // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio. @@ -236,7 +239,7 @@ public: : consumer_(consumer) { } - + void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override { audio_cadence_ = format_desc.audio_cadence; @@ -247,14 +250,14 @@ public: } std::future send(const_frame frame) override - { + { if(audio_cadence_.size() == 1) return consumer_->send(frame); std::future result = make_ready_future(true); - + if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() * channel_layout_.num_channels == static_cast(frame.audio_data().size())) - { + { // Audio sent so far is in sync, now we can send the next chunk. result = consumer_->send(frame); boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1); @@ -263,10 +266,10 @@ public: CASPAR_LOG(trace) << print() << L" Syncing audio."; sync_buffer_.push_back(static_cast(frame.audio_data().size() / channel_layout_.num_channels)); - + return std::move(result); } - + std::wstring print() const override {return consumer_->print();} std::wstring name() const override {return consumer_->name();} boost::property_tree::wptree info() const override {return consumer_->info();} @@ -274,22 +277,23 @@ public: int buffer_depth() const override {return consumer_->buffer_depth();} int index() const override {return consumer_->index();} int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();} - monitor::subject& monitor_output() override {return consumer_->monitor_output();} + monitor::subject& monitor_output() override {return consumer_->monitor_output();} + const frame_consumer* unwrapped() const override {return consumer_->unwrapped();} }; spl::shared_ptr frame_consumer_registry::create_consumer( - const std::vector& params, interaction_sink* sink) const + const std::vector& params, interaction_sink* sink, std::vector> channels) const { if(params.empty()) CASPAR_THROW_EXCEPTION(invalid_argument() << msg_info("params cannot be empty")); - + auto consumer = frame_consumer::empty(); auto& consumer_factories = impl_->consumer_factories; std::any_of(consumer_factories.begin(), consumer_factories.end(), [&](const consumer_factory_t& factory) -> bool { try { - consumer = factory(params, sink); + consumer = factory(params, sink, channels); } catch(...) { @@ -311,7 +315,8 @@ spl::shared_ptr frame_consumer_registry::create_consumer( spl::shared_ptr frame_consumer_registry::create_consumer( const std::wstring& element_name, const boost::property_tree::wptree& element, - interaction_sink* sink) const + interaction_sink* sink, + std::vector> channels) const { auto& preconfigured_consumer_factories = impl_->preconfigured_consumer_factories; auto found = preconfigured_consumer_factories.find(element_name); @@ -324,7 +329,7 @@ spl::shared_ptr frame_consumer_registry::create_consumer( spl::make_shared( spl::make_shared( spl::make_shared( - found->second(element, sink))))); + found->second(element, sink, channels))))); } const spl::shared_ptr& frame_consumer::empty() diff --git a/core/consumer/frame_consumer.h b/core/consumer/frame_consumer.h index e09e4edcf..58ffeb61b 100644 --- a/core/consumer/frame_consumer.h +++ b/core/consumer/frame_consumer.h @@ -44,19 +44,19 @@ class frame_consumer public: // Static Members - + static const spl::shared_ptr& empty(); // Constructors frame_consumer(){} virtual ~frame_consumer() {} - + // Methods virtual std::future send(const_frame frame) = 0; virtual void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) = 0; - + // monitor::observable virtual monitor::subject& monitor_output() = 0; @@ -70,14 +70,17 @@ public: virtual int buffer_depth() const = 0; // -1 to not participate in frame presentation synchronization virtual int index() const = 0; virtual int64_t presentation_frame_age_millis() const = 0; + virtual const frame_consumer* unwrapped() const { return this; } }; typedef std::function( const std::vector&, - interaction_sink* sink)> consumer_factory_t; + interaction_sink* sink, + std::vector> channels)> consumer_factory_t; typedef std::function( const boost::property_tree::wptree& element, - interaction_sink* sink)> preconfigured_consumer_factory_t; + interaction_sink* sink, + std::vector> channels)> preconfigured_consumer_factory_t; class frame_consumer_registry : boost::noncopyable { @@ -89,11 +92,13 @@ public: const preconfigured_consumer_factory_t& factory); spl::shared_ptr create_consumer( const std::vector& params, - interaction_sink* sink) const; + interaction_sink* sink, + std::vector> channels) const; spl::shared_ptr create_consumer( const std::wstring& element_name, const boost::property_tree::wptree& element, - interaction_sink* sink) const; + interaction_sink* sink, + std::vector> channels) const; private: struct impl; spl::shared_ptr impl_; diff --git a/core/consumer/output.cpp b/core/consumer/output.cpp index 33cf01d7e..d3c0580eb 100644 --- a/core/consumer/output.cpp +++ b/core/consumer/output.cpp @@ -53,7 +53,7 @@ namespace caspar { namespace core { struct output::impl -{ +{ spl::shared_ptr graph_; spl::shared_ptr monitor_subject_ = spl::make_shared("/output"); const int channel_index_; @@ -65,23 +65,23 @@ struct output::impl std::map send_to_consumers_delays_; executor executor_ { L"output " + boost::lexical_cast(channel_index_) }; public: - impl(spl::shared_ptr graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) + impl(spl::shared_ptr graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) : graph_(std::move(graph)) , channel_index_(channel_index) , format_desc_(format_desc) , channel_layout_(channel_layout) { graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8f)); - } - + } + void add(int index, spl::shared_ptr consumer) - { + { remove(index); consumer->initialize(format_desc_, channel_layout_, channel_index_); - + executor_.begin_invoke([this, index, consumer] - { + { port p(index, channel_index_, std::move(consumer)); p.monitor_output().attach_parent(monitor_subject_); ports_.insert(std::make_pair(index, std::move(p))); @@ -94,7 +94,7 @@ public: } void remove(int index) - { + { executor_.begin_invoke([=] { auto it = ports_.find(index); @@ -110,7 +110,7 @@ public: { remove(consumer->index()); } - + void change_channel_format(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout) { executor_.invoke([&] @@ -120,7 +120,7 @@ public: auto it = ports_.begin(); while(it != ports_.end()) - { + { try { it->second.change_channel_format(format_desc, channel_layout); @@ -133,7 +133,7 @@ public: ports_.erase(it++); } } - + format_desc_ = format_desc; channel_layout_ = channel_layout; frames_.clear(); @@ -141,7 +141,7 @@ public: } std::pair minmax_buffer_depth() const - { + { if(ports_.empty()) return std::make_pair(0, 0); @@ -159,7 +159,7 @@ public: .where(std::mem_fn(&port::has_synchronization_clock)) .any(); } - + std::future operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout) { spl::shared_ptr frame_timer; @@ -262,12 +262,12 @@ public: std::future info() { return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree - { + { boost::property_tree::wptree info; for (auto& port : ports_) { info.add_child(L"consumers.consumer", port.second.info()) - .add(L"index", port.first); + .add(L"index", port.first); } return info; }, task_priority::high_priority)); @@ -297,6 +297,19 @@ public: return info; }, task_priority::high_priority)); } + + std::vector> get_consumers() + { + return executor_.invoke([=] + { + std::vector> consumers; + + for (auto& port : ports_) + consumers.push_back(port.second.consumer()); + + return consumers; + }); + } }; output::output(spl::shared_ptr graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_layout, channel_index)){} @@ -306,6 +319,7 @@ void output::remove(int index){impl_->remove(index);} void output::remove(const spl::shared_ptr& consumer){impl_->remove(consumer);} std::future output::info() const{return impl_->info();} std::future output::delay_info() const{ return impl_->delay_info(); } +std::vector> output::get_consumers() const { return impl_->get_consumers(); } std::future output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ return (*impl_)(std::move(frame), format_desc, channel_layout); } monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;} }} diff --git a/core/consumer/output.h b/core/consumer/output.h index 85a925632..bd75385c7 100644 --- a/core/consumer/output.h +++ b/core/consumer/output.h @@ -35,7 +35,7 @@ FORWARD2(caspar, diagnostics, class graph); namespace caspar { namespace core { - + class output final { output(const output&); @@ -47,27 +47,28 @@ public: // Constructors explicit output(spl::shared_ptr graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index); - + // Methods // Returns when submitted to consumers, but the future indicates when the consumers are ready for a new frame. std::future operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout); - + void add(const spl::shared_ptr& consumer); void add(int index, const spl::shared_ptr& consumer); void remove(const spl::shared_ptr& consumer); void remove(int index); - + monitor::subject& monitor_output(); // Properties std::future info() const; std::future delay_info() const; + std::vector> get_consumers() const; private: struct impl; spl::shared_ptr impl_; }; -}} \ No newline at end of file +}} diff --git a/core/consumer/port.cpp b/core/consumer/port.cpp index db3c68fa8..ecc19836d 100644 --- a/core/consumer/port.cpp +++ b/core/consumer/port.cpp @@ -14,7 +14,7 @@ struct port::impl { int index_; spl::shared_ptr monitor_subject_ = spl::make_shared("/port/" + boost::lexical_cast(index_)); - std::shared_ptr consumer_; + spl::shared_ptr consumer_; int channel_index_; public: impl(int index, int channel_index, spl::shared_ptr consumer) @@ -24,12 +24,12 @@ public: { consumer_->monitor_output().attach_parent(monitor_subject_); } - + void change_channel_format(const core::video_format_desc& format_desc, const audio_channel_layout& channel_layout) { consumer_->initialize(format_desc, channel_layout, channel_index_); } - + std::future send(const_frame frame) { *monitor_subject_ << monitor::message("/type") % consumer_->name(); @@ -64,13 +64,18 @@ public: { return consumer_->presentation_frame_age_millis(); } + + spl::shared_ptr consumer() const + { + return consumer_; + } }; port::port(int index, int channel_index, spl::shared_ptr consumer) : impl_(new impl(index, channel_index, std::move(consumer))){} port::port(port&& other) : impl_(std::move(other.impl_)){} port::~port(){} port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;} -std::future port::send(const_frame frame){return impl_->send(std::move(frame));} +std::future port::send(const_frame frame){return impl_->send(std::move(frame));} monitor::subject& port::monitor_output() {return *impl_->monitor_subject_;} void port::change_channel_format(const core::video_format_desc& format_desc, const audio_channel_layout& channel_layout){impl_->change_channel_format(format_desc, channel_layout);} int port::buffer_depth() const{return impl_->buffer_depth();} @@ -78,4 +83,5 @@ std::wstring port::print() const{ return impl_->print();} bool port::has_synchronization_clock() const{return impl_->has_synchronization_clock();} boost::property_tree::wptree port::info() const{return impl_->info();} int64_t port::presentation_frame_age_millis() const{ return impl_->presentation_frame_age_millis(); } -}} \ No newline at end of file +spl::shared_ptr port::consumer() const { return impl_->consumer(); } +}} diff --git a/core/consumer/port.h b/core/consumer/port.h index 46db42c2d..48e639c92 100644 --- a/core/consumer/port.h +++ b/core/consumer/port.h @@ -28,7 +28,7 @@ public: port& operator=(port&& other); - std::future send(const_frame frame); + std::future send(const_frame frame); monitor::subject& monitor_output(); @@ -40,9 +40,10 @@ public: bool has_synchronization_clock() const; boost::property_tree::wptree info() const; int64_t presentation_frame_age_millis() const; + spl::shared_ptr consumer() const; private: struct impl; std::unique_ptr impl_; }; -}} \ No newline at end of file +}} diff --git a/core/consumer/syncto/syncto_consumer.cpp b/core/consumer/syncto/syncto_consumer.cpp new file mode 100644 index 000000000..632bdb084 --- /dev/null +++ b/core/consumer/syncto/syncto_consumer.cpp @@ -0,0 +1,188 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#include "../../StdAfx.h" + +#include "syncto_consumer.h" + +#include "../frame_consumer.h" +#include "../../frame/frame.h" +#include "../../help/help_sink.h" +#include "../../module_dependencies.h" +#include "../../monitor/monitor.h" +#include "../../video_channel.h" +#include "../output.h" + +#include + +#include +#include + +#include + +namespace caspar { namespace core { namespace syncto { + +void verify_cyclic_reference(int self_channel_index, const spl::shared_ptr& other_channel); + +class syncto_consumer : public frame_consumer +{ + monitor::subject monitor_subject_; + spl::shared_ptr other_channel_; + semaphore frames_to_render_ { 0 }; + std::shared_ptr tick_subscription_; + int self_channel_index_ = -1; +public: + syncto_consumer(spl::shared_ptr other_channel) + : other_channel_(std::move(other_channel)) + { + } + + void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override + { + verify_cyclic_reference(channel_index, other_channel_); + + self_channel_index_ = channel_index; + tick_subscription_ = other_channel_->add_tick_listener([=] + { + frames_to_render_.release(); + }); + } + + std::future send(const_frame frame) override + { + auto task = spl::make_shared>([=] { return true; }); + + frames_to_render_.acquire(1, [task] + { + (*task)(); + }); + + return task->get_future(); + } + + monitor::subject& monitor_output() override + { + return monitor_subject_; + } + + std::wstring print() const override + { + if (self_channel_index_ != -1) + return L"sync[" + boost::lexical_cast(self_channel_index_) + L"]to[" + boost::lexical_cast(other_channel_->index()) + L"]"; + else + return L"syncto[" + boost::lexical_cast(other_channel_->index()) + L"]"; + } + + std::wstring name() const override + { + return L"syncto"; + } + + boost::property_tree::wptree info() const override + { + boost::property_tree::wptree info; + info.add(L"type", L"syncto-consumer"); + info.add(L"channel-to-sync-to", other_channel_->index()); + return info; + } + + bool has_synchronization_clock() const override + { + return true; + } + + int buffer_depth() const override + { + return -1; + } + + int index() const override + { + return 70000; + } + + int64_t presentation_frame_age_millis() const override + { + return 0; + } + + spl::shared_ptr other_channel() const + { + return other_channel_; + } +}; + +void verify_cyclic_reference(int self_channel_index, const spl::shared_ptr& other_channel) +{ + if (self_channel_index == other_channel->index()) + CASPAR_THROW_EXCEPTION(user_error() << msg_info( + L"Cannot create syncto consumer where source channel and destination channel is the same or indirectly related")); + + for (auto& consumer : other_channel->output().get_consumers()) + { + auto raw_consumer = consumer->unwrapped(); + auto syncto = dynamic_cast(raw_consumer); + + if (syncto) + verify_cyclic_reference(self_channel_index, syncto->other_channel()); + } +} + +void describe_consumer(core::help_sink& sink, const core::help_repository& repo) +{ + sink.short_description(L"Lets a channel provide sync to another."); + sink.syntax(L"SYNCTO [other_channel:int]"); + sink.para()->text(L"Provides sync to its own channel based on the rendering pace of the specified channel."); + sink.para()->text(L"Examples:"); + sink.example(L">> ADD 1 SYNCTO 2"); +} + +spl::shared_ptr create_consumer( + const std::vector& params, + core::interaction_sink*, + std::vector> channels) +{ + if (params.size() < 1 || !boost::iequals(params.at(0), L"SYNCTO")) + return core::frame_consumer::empty(); + + auto channel_id = boost::lexical_cast(params.at(1)); + auto channel = channels.at(channel_id - 1); + + return spl::make_shared(channel); +} + +spl::shared_ptr create_preconfigured_consumer( + const boost::property_tree::wptree& ptree, + core::interaction_sink*, + std::vector> channels) +{ + auto channel_id = ptree.get(L"channel-id"); + + return spl::make_shared(channels.at(channel_id - 1)); +} + +void init(module_dependencies dependencies) +{ + dependencies.consumer_registry->register_consumer_factory(L"syncto", &create_consumer, &describe_consumer); + dependencies.consumer_registry->register_preconfigured_consumer_factory(L"syncto", &create_preconfigured_consumer); +} + +}}} diff --git a/core/consumer/syncto/syncto_consumer.h b/core/consumer/syncto/syncto_consumer.h new file mode 100644 index 000000000..9eac2eae7 --- /dev/null +++ b/core/consumer/syncto/syncto_consumer.h @@ -0,0 +1,30 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include "../../fwd.h" + +namespace caspar { namespace core { namespace syncto { + +void init(caspar::core::module_dependencies dependencies); + +}}} diff --git a/core/video_channel.cpp b/core/video_channel.cpp index 8ee1992da..a0ac2c201 100644 --- a/core/video_channel.cpp +++ b/core/video_channel.cpp @@ -49,6 +49,7 @@ #include #include +#include namespace caspar { namespace core { @@ -76,6 +77,10 @@ struct video_channel::impl final caspar::core::mixer mixer_; caspar::core::stage stage_; + mutable tbb::spin_mutex tick_listeners_mutex_; + int64_t last_tick_listener_id = 0; + std::unordered_map> tick_listeners_; + executor executor_ { L"video_channel " + boost::lexical_cast(index_) }; public: impl( @@ -145,10 +150,28 @@ public: }); } + void invoke_tick_listeners() + { + auto listeners = lock(tick_listeners_mutex_, [=] { return tick_listeners_; }); + + for (auto listener : listeners) + { + try + { + listener.second(); + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } + } + void tick() { try { + invoke_tick_listeners(); auto format_desc = video_format_desc(); auto channel_layout = audio_channel_layout(); @@ -171,7 +194,7 @@ public: auto frame_time = frame_timer.elapsed()*format_desc.fps*0.5; graph_->set_value("tick-time", frame_time); - *monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps) + *monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/ video_format_desc().fps) << monitor::message("/format") % format_desc.name; } catch(...) @@ -225,6 +248,23 @@ public: return info; } + + std::shared_ptr add_tick_listener(std::function listener) + { + return lock(tick_listeners_mutex_, [&] + { + auto tick_listener_id = last_tick_listener_id++; + tick_listeners_.insert(std::make_pair(tick_listener_id, listener)); + + return std::shared_ptr(nullptr, [=](void*) + { + lock(tick_listeners_mutex_, [&] + { + tick_listeners_.erase(tick_listener_id); + }); + }); + }); + } }; video_channel::video_channel( @@ -248,5 +288,6 @@ boost::property_tree::wptree video_channel::info() const{return impl_->info();} boost::property_tree::wptree video_channel::delay_info() const { return impl_->delay_info(); } int video_channel::index() const { return impl_->index(); } monitor::subject& video_channel::monitor_output(){ return *impl_->monitor_subject_; } +std::shared_ptr video_channel::add_tick_listener(std::function listener) { return impl_->add_tick_listener(std::move(listener)); } }} diff --git a/core/video_channel.h b/core/video_channel.h index b3ac548a7..4c256b0dc 100644 --- a/core/video_channel.h +++ b/core/video_channel.h @@ -30,8 +30,10 @@ #include +#include + namespace caspar { namespace core { - + class video_channel final { video_channel(const video_channel&); @@ -50,7 +52,7 @@ public: ~video_channel(); // Methods - + monitor::subject& monitor_output(); // Properties @@ -67,6 +69,8 @@ public: core::audio_channel_layout audio_channel_layout() const; void audio_channel_layout(const core::audio_channel_layout& channel_layout); + std::shared_ptr add_tick_listener(std::function listener); + spl::shared_ptr frame_factory(); boost::property_tree::wptree info() const; @@ -77,4 +81,4 @@ private: spl::unique_ptr impl_; }; -}} \ No newline at end of file +}} diff --git a/modules/bluefish/consumer/bluefish_consumer.cpp b/modules/bluefish/consumer/bluefish_consumer.cpp index 50529d9f1..a51b9dc80 100644 --- a/modules/bluefish/consumer/bluefish_consumer.cpp +++ b/modules/bluefish/consumer/bluefish_consumer.cpp @@ -460,7 +460,7 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo) } spl::shared_ptr create_consumer( - const std::vector& params, core::interaction_sink*) + const std::vector& params, core::interaction_sink*, std::vector> channels) { if(params.size() < 1 || !boost::iequals(params.at(0), L"BLUEFISH")) return core::frame_consumer::empty(); @@ -487,8 +487,8 @@ spl::shared_ptr create_consumer( } spl::shared_ptr create_preconfigured_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*) -{ + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) +{ const auto device_index = ptree.get( L"device", 1); const auto embedded_audio = ptree.get( L"embedded-audio", false); const auto key_only = ptree.get( L"key-only", false); diff --git a/modules/bluefish/consumer/bluefish_consumer.h b/modules/bluefish/consumer/bluefish_consumer.h index 3c4ec6dbb..bcdc48bdd 100644 --- a/modules/bluefish/consumer/bluefish_consumer.h +++ b/modules/bluefish/consumer/bluefish_consumer.h @@ -33,8 +33,10 @@ namespace caspar { namespace bluefish { void describe_consumer(core::help_sink& sink, const core::help_repository& repo); spl::shared_ptr create_consumer( - const std::vector& params, core::interaction_sink*); + const std::vector& params, core::interaction_sink*, + std::vector> channels); spl::shared_ptr create_preconfigured_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*); + const boost::property_tree::wptree& ptree, core::interaction_sink*, + std::vector> channels); -}} \ No newline at end of file +}} diff --git a/modules/decklink/consumer/decklink_consumer.cpp b/modules/decklink/consumer/decklink_consumer.cpp index 337806876..d4da7d9da 100644 --- a/modules/decklink/consumer/decklink_consumer.cpp +++ b/modules/decklink/consumer/decklink_consumer.cpp @@ -817,7 +817,7 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo) } spl::shared_ptr create_consumer( - const std::vector& params, core::interaction_sink*) + const std::vector& params, core::interaction_sink*, std::vector> channels) { if (params.size() < 1 || !boost::iequals(params.at(0), L"DECKLINK")) return core::frame_consumer::empty(); @@ -863,7 +863,7 @@ spl::shared_ptr create_consumer( } spl::shared_ptr create_preconfigured_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*) + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) { configuration config; diff --git a/modules/decklink/consumer/decklink_consumer.h b/modules/decklink/consumer/decklink_consumer.h index 9c0de882f..b06fe7f58 100644 --- a/modules/decklink/consumer/decklink_consumer.h +++ b/modules/decklink/consumer/decklink_consumer.h @@ -34,8 +34,10 @@ namespace caspar { namespace decklink { void describe_consumer(core::help_sink& sink, const core::help_repository& repo); spl::shared_ptr create_consumer( - const std::vector& params, core::interaction_sink*); + const std::vector& params, core::interaction_sink*, + std::vector> channels); spl::shared_ptr create_preconfigured_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*); + const boost::property_tree::wptree& ptree, core::interaction_sink*, + std::vector> channels); -}} \ No newline at end of file +}} diff --git a/modules/ffmpeg/consumer/ffmpeg_consumer.cpp b/modules/ffmpeg/consumer/ffmpeg_consumer.cpp index b3006ab0a..20a30cd83 100644 --- a/modules/ffmpeg/consumer/ffmpeg_consumer.cpp +++ b/modules/ffmpeg/consumer/ffmpeg_consumer.cpp @@ -885,7 +885,7 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo) } spl::shared_ptr create_consumer( - const std::vector& params, core::interaction_sink*) + const std::vector& params, core::interaction_sink*, std::vector> channels) { auto params2 = params; auto separate_key_it = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY")); @@ -925,7 +925,7 @@ spl::shared_ptr create_consumer( } spl::shared_ptr create_preconfigured_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*) + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) { auto filename = ptree_get(ptree, L"path"); auto codec = ptree.get(L"vcodec", L"libx264"); diff --git a/modules/ffmpeg/consumer/ffmpeg_consumer.h b/modules/ffmpeg/consumer/ffmpeg_consumer.h index ac27b94e9..7e62a4f09 100644 --- a/modules/ffmpeg/consumer/ffmpeg_consumer.h +++ b/modules/ffmpeg/consumer/ffmpeg_consumer.h @@ -33,7 +33,9 @@ namespace caspar { namespace ffmpeg { void describe_consumer(core::help_sink& sink, const core::help_repository& repo); -spl::shared_ptr create_consumer(const std::vector& params, core::interaction_sink*); -spl::shared_ptr create_preconfigured_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*); +spl::shared_ptr create_consumer( + const std::vector& params, core::interaction_sink*, std::vector> channels); +spl::shared_ptr create_preconfigured_consumer( + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels); -}} \ No newline at end of file +}} diff --git a/modules/ffmpeg/consumer/streaming_consumer.cpp b/modules/ffmpeg/consumer/streaming_consumer.cpp index 41cc3e798..4196f1e85 100644 --- a/modules/ffmpeg/consumer/streaming_consumer.cpp +++ b/modules/ffmpeg/consumer/streaming_consumer.cpp @@ -1259,7 +1259,7 @@ void describe_streaming_consumer(core::help_sink& sink, const core::help_reposit } spl::shared_ptr create_streaming_consumer( - const std::vector& params, core::interaction_sink*) + const std::vector& params, core::interaction_sink*, std::vector> channels) { if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE"))) return core::frame_consumer::empty(); @@ -1272,7 +1272,7 @@ spl::shared_ptr create_streaming_consumer( } spl::shared_ptr create_preconfigured_streaming_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*) + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) { return spl::make_shared( u8(ptree_get(ptree, L"path")), diff --git a/modules/ffmpeg/consumer/streaming_consumer.h b/modules/ffmpeg/consumer/streaming_consumer.h index 2663cf4f1..7d7a40164 100644 --- a/modules/ffmpeg/consumer/streaming_consumer.h +++ b/modules/ffmpeg/consumer/streaming_consumer.h @@ -13,8 +13,8 @@ namespace caspar { namespace ffmpeg { void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo); spl::shared_ptr create_streaming_consumer( - const std::vector& params, core::interaction_sink*); + const std::vector& params, core::interaction_sink*, std::vector> channels); spl::shared_ptr create_preconfigured_streaming_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*); + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels); -}} \ No newline at end of file +}} diff --git a/modules/image/consumer/image_consumer.cpp b/modules/image/consumer/image_consumer.cpp index ab466f046..4730402d7 100644 --- a/modules/image/consumer/image_consumer.cpp +++ b/modules/image/consumer/image_consumer.cpp @@ -106,7 +106,7 @@ public: try { auto filename2 = filename; - + if (filename2.empty()) filename2 = env::media_folder() + boost::posix_time::to_iso_wstring(boost::posix_time::second_clock::local_time()) + L".png"; else @@ -135,7 +135,7 @@ public: { return L"image[]"; } - + std::wstring name() const override { return L"image"; @@ -176,7 +176,8 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo) sink.example(L">> ADD 1 IMAGE", L"creating media/20130228T210946.png if the current time is 2013-02-28 21:09:46."); } -spl::shared_ptr create_consumer(const std::vector& params, core::interaction_sink*) +spl::shared_ptr create_consumer( + const std::vector& params, core::interaction_sink*, std::vector> channels) { if (params.size() < 1 || !boost::iequals(params.at(0), L"IMAGE")) return core::frame_consumer::empty(); diff --git a/modules/image/consumer/image_consumer.h b/modules/image/consumer/image_consumer.h index fc47c6254..111e319c1 100644 --- a/modules/image/consumer/image_consumer.h +++ b/modules/image/consumer/image_consumer.h @@ -31,10 +31,10 @@ #include #include -namespace caspar { +namespace caspar { namespace image { - + void write_cropped_png( const class core::const_frame& frame, const core::video_format_desc& format_desc, @@ -43,6 +43,7 @@ void write_cropped_png( int height); void describe_consumer(core::help_sink& sink, const core::help_repository& repo); -spl::shared_ptr create_consumer(const std::vector& params, struct core::interaction_sink*); +spl::shared_ptr create_consumer( + const std::vector& params, struct core::interaction_sink*, std::vector> channels); }} diff --git a/modules/newtek/consumer/newtek_ivga_consumer.cpp b/modules/newtek/consumer/newtek_ivga_consumer.cpp index feade373c..f3d391795 100644 --- a/modules/newtek/consumer/newtek_ivga_consumer.cpp +++ b/modules/newtek/consumer/newtek_ivga_consumer.cpp @@ -18,7 +18,7 @@ * * Author: Robert Nagy, ronag@live.com */ - + #include "../StdAfx.h" #include "newtek_ivga_consumer.h" @@ -76,13 +76,13 @@ public: graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f)); diagnostics::register_graph(graph_); } - + ~newtek_ivga_consumer() { } // frame_consumer - + virtual void initialize( const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, @@ -176,7 +176,7 @@ public: { return -1; } - + virtual int index() const override { return 900; @@ -191,7 +191,7 @@ public: { return false; } -}; +}; void describe_ivga_consumer(core::help_sink& sink, const core::help_repository& repo) { @@ -202,7 +202,7 @@ void describe_ivga_consumer(core::help_sink& sink, const core::help_repository& sink.example(L">> ADD 1 NEWTEK_IVGA"); } -spl::shared_ptr create_ivga_consumer(const std::vector& params, core::interaction_sink*) +spl::shared_ptr create_ivga_consumer(const std::vector& params, core::interaction_sink*, std::vector> channels) { if (params.size() < 1 || !boost::iequals(params.at(0), L"NEWTEK_IVGA")) return core::frame_consumer::empty(); @@ -210,9 +210,9 @@ spl::shared_ptr create_ivga_consumer(const std::vector(); } -spl::shared_ptr create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*) -{ +spl::shared_ptr create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) +{ return spl::make_shared(); } -}} \ No newline at end of file +}} diff --git a/modules/newtek/consumer/newtek_ivga_consumer.h b/modules/newtek/consumer/newtek_ivga_consumer.h index fc41ca726..1fa43b7ee 100644 --- a/modules/newtek/consumer/newtek_ivga_consumer.h +++ b/modules/newtek/consumer/newtek_ivga_consumer.h @@ -32,7 +32,7 @@ namespace caspar { namespace newtek { void describe_ivga_consumer(core::help_sink& sink, const core::help_repository& repo); -spl::shared_ptr create_ivga_consumer(const std::vector& params, core::interaction_sink*); -spl::shared_ptr create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*); +spl::shared_ptr create_ivga_consumer(const std::vector& params, core::interaction_sink*, std::vector> channels); +spl::shared_ptr create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels); -}} \ No newline at end of file +}} diff --git a/modules/oal/consumer/oal_consumer.cpp b/modules/oal/consumer/oal_consumer.cpp index da6108c69..286622f89 100644 --- a/modules/oal/consumer/oal_consumer.cpp +++ b/modules/oal/consumer/oal_consumer.cpp @@ -72,7 +72,7 @@ public: if(!context_) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Failed to create audio context.")); - + if(alcMakeContextCurrent(context_) == ALC_FALSE) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Failed to activate audio context.")); } @@ -98,7 +98,7 @@ void init_device() { static std::unique_ptr instance; static boost::once_flag f = BOOST_ONCE_INIT; - + boost::call_once(f, []{instance.reset(new device());}); } @@ -110,7 +110,7 @@ struct oal_consumer : public core::frame_consumer boost::timer perf_timer_; tbb::atomic presentation_age_; int channel_index_ = -1; - + core::video_format_desc format_desc_; core::audio_channel_layout out_channel_layout_; std::unique_ptr channel_remapper_; @@ -130,7 +130,7 @@ public: init_device(); - graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); + graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f)); graph_->set_color("late-frame", diagnostics::color(0.6f, 0.3f, 0.3f)); diagnostics::register_graph(graph_); @@ -139,7 +139,7 @@ public: ~oal_consumer() { executor_.invoke([=] - { + { if(source_) { alSourceStop(source_); @@ -158,7 +158,7 @@ public: void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) override { - format_desc_ = format_desc; + format_desc_ = format_desc; channel_index_ = channel_index; if (out_channel_layout_ == core::audio_channel_layout::invalid()) out_channel_layout_ = channel_layout.num_channels == 2 ? channel_layout : *core::audio_channel_layout_repository::get_default()->get_layout(L"stereo"); @@ -169,7 +169,7 @@ public: graph_->set_text(print()); executor_.begin_invoke([=] - { + { buffers_.resize(format_desc_.fps > 30 ? 8 : 4); alGenBuffers(static_cast(buffers_.size()), buffers_.data()); alGenSources(1, &source_); @@ -180,10 +180,10 @@ public: alBufferData(buffers_[n], AL_FORMAT_STEREO16, audio.data(), static_cast(audio.size()*sizeof(int16_t)), format_desc_.audio_sample_rate); alSourceQueueBuffers(source_, 1, &buffers_[n]); } - + alSourcei(source_, AL_LOOPING, AL_FALSE); - alSourcePlay(source_); + alSourcePlay(source_); }); } @@ -198,13 +198,13 @@ public: // exhausted, which should not happen executor_.begin_invoke([=] { - ALenum state; + ALenum state; alGetSourcei(source_, AL_SOURCE_STATE,&state); if(state != AL_PLAYING) { for(int n = 0; n < buffers_.size()-1; ++n) - { - ALuint buffer = 0; + { + ALuint buffer = 0; alSourceUnqueueBuffers(source_, 1, &buffer); if(buffer) { @@ -213,13 +213,13 @@ public: alSourceQueueBuffers(source_, 1, &buffer); } } - alSourcePlay(source_); + alSourcePlay(source_); graph_->set_tag(diagnostics::tag_severity::WARNING, "late-frame"); } auto audio = core::audio_32_to_16(channel_remapper_->mix_and_rearrange(frame.audio_data())); - - ALuint buffer = 0; + + ALuint buffer = 0; alSourceUnqueueBuffers(source_, 1, &buffer); if(buffer) { @@ -229,14 +229,14 @@ public: else graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame"); - graph_->set_value("tick-time", perf_timer_.elapsed()*format_desc_.fps*0.5); + graph_->set_value("tick-time", perf_timer_.elapsed()*format_desc_.fps*0.5); perf_timer_.restart(); presentation_age_ = frame.get_age_millis() + latency_millis(); }); return make_ready_future(true); } - + std::wstring print() const override { return L"oal[" + boost::lexical_cast(channel_index_) + L"|" + format_desc_.name + L"]"; @@ -253,7 +253,7 @@ public: info.add(L"type", L"system-audio"); return info; } - + bool has_synchronization_clock() const override { return false; @@ -263,14 +263,14 @@ public: { return latency_millis_; } - + int buffer_depth() const override { int delay_in_frames = static_cast(latency_millis() / (1000.0 / format_desc_.fps)); - + return delay_in_frames; } - + int index() const override { return 500; @@ -293,7 +293,8 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo) sink.example(L">> ADD 1 AUDIO LATENCY 500", L"Specifies that the system-audio chain: openal => driver => sound card => speaker output is 500ms"); } -spl::shared_ptr create_consumer(const std::vector& params, core::interaction_sink*) +spl::shared_ptr create_consumer( + const std::vector& params, core::interaction_sink*, std::vector> channels) { if(params.size() < 1 || !boost::iequals(params.at(0), L"AUDIO")) return core::frame_consumer::empty(); @@ -316,7 +317,8 @@ spl::shared_ptr create_consumer(const std::vector(channel_layout, latency_millis); } -spl::shared_ptr create_preconfigured_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*) +spl::shared_ptr create_preconfigured_consumer( + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) { auto channel_layout = core::audio_channel_layout::invalid(); auto channel_layout_spec = ptree.get_optional(L"channel-layout"); diff --git a/modules/oal/consumer/oal_consumer.h b/modules/oal/consumer/oal_consumer.h index abb7b7c32..a688d1094 100644 --- a/modules/oal/consumer/oal_consumer.h +++ b/modules/oal/consumer/oal_consumer.h @@ -30,11 +30,11 @@ #include namespace caspar { namespace oal { - + void describe_consumer(core::help_sink& sink, const core::help_repository& repo); spl::shared_ptr create_consumer( - const std::vector& params, core::interaction_sink*); + const std::vector& params, core::interaction_sink*, std::vector> channels); spl::shared_ptr create_preconfigured_consumer( - const boost::property_tree::wptree&, core::interaction_sink*); + const boost::property_tree::wptree&, core::interaction_sink*, std::vector> channels); -}} \ No newline at end of file +}} diff --git a/modules/screen/consumer/screen_consumer.cpp b/modules/screen/consumer/screen_consumer.cpp index d40429d24..b992cfb69 100644 --- a/modules/screen/consumer/screen_consumer.cpp +++ b/modules/screen/consumer/screen_consumer.cpp @@ -696,7 +696,8 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo) sink.example(L">> ADD 1 SCREEN 1 BORDERLESS", L"opens a screen consumer without borders/window decorations on screen 1."); } -spl::shared_ptr create_consumer(const std::vector& params, core::interaction_sink* sink) +spl::shared_ptr create_consumer( + const std::vector& params, core::interaction_sink* sink, std::vector> channels) { if (params.size() < 1 || !boost::iequals(params.at(0), L"SCREEN")) return core::frame_consumer::empty(); @@ -718,7 +719,8 @@ spl::shared_ptr create_consumer(const std::vector(config, sink); } -spl::shared_ptr create_preconfigured_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink* sink) +spl::shared_ptr create_preconfigured_consumer( + const boost::property_tree::wptree& ptree, core::interaction_sink* sink, std::vector> channels) { configuration config; config.name = ptree.get(L"name", config.name); diff --git a/modules/screen/consumer/screen_consumer.h b/modules/screen/consumer/screen_consumer.h index f4293c1f3..fb754db6e 100644 --- a/modules/screen/consumer/screen_consumer.h +++ b/modules/screen/consumer/screen_consumer.h @@ -33,9 +33,11 @@ namespace caspar { namespace screen { void describe_consumer(core::help_sink& sink, const core::help_repository& repo); spl::shared_ptr create_consumer( const std::vector& params, - core::interaction_sink* sink); + core::interaction_sink* sink, + std::vector> channels); spl::shared_ptr create_preconfigured_consumer( const boost::property_tree::wptree& ptree, - core::interaction_sink* sink); + core::interaction_sink* sink, + std::vector> channels); -}} \ No newline at end of file +}} diff --git a/protocol/amcp/AMCPCommandsImpl.cpp b/protocol/amcp/AMCPCommandsImpl.cpp index 710977ec6..2788325c7 100644 --- a/protocol/amcp/AMCPCommandsImpl.cpp +++ b/protocol/amcp/AMCPCommandsImpl.cpp @@ -87,16 +87,16 @@ /* Return codes 102 [action] Information that [action] has happened -101 [action] Information that [action] has happened plus one row of data +101 [action] Information that [action] has happened plus one row of data 202 [command] OK [command] has been executed -201 [command] OK [command] has been executed, plus one row of data +201 [command] OK [command] has been executed, plus one row of data 200 [command] OK [command] has been executed, plus multiple lines of data. ends with an empty line 400 ERROR the command could not be understood 401 [command] ERROR invalid/missing channel 402 [command] ERROR parameter missing -403 [command] ERROR invalid parameter +403 [command] ERROR invalid parameter 404 [command] ERROR file not found 500 FAILED internal error @@ -134,7 +134,7 @@ std::wstring read_utf8_file(const boost::filesystem::path& file) std::wstringstream result; boost::filesystem::wifstream filestream(file); - if (filestream) + if (filestream) { // Consume BOM first filestream.get(); @@ -234,11 +234,11 @@ std::wstring MediaInfo(const boost::filesystem::path& path, const spl::shared_pt } std::wstring ListMedia(const spl::shared_ptr& media_info_repo) -{ +{ std::wstringstream replyString; for (boost::filesystem::recursive_directory_iterator itr(env::media_folder()), end; itr != end; ++itr) replyString << MediaInfo(itr->path(), media_info_repo); - + return boost::to_upper_copy(replyString.str()); } @@ -247,7 +247,7 @@ std::wstring ListTemplates(const spl::shared_ptr& cg std::wstringstream replyString; for (boost::filesystem::recursive_directory_iterator itr(env::template_folder()), end; itr != end; ++itr) - { + { if(boost::filesystem::is_regular_file(itr->path()) && cg_registry->is_cg_extension(itr->path().extension().wstring())) { auto relativePath = get_relative_without_extension(itr->path(), env::template_folder()); @@ -264,7 +264,7 @@ std::wstring ListTemplates(const spl::shared_ptr& cg auto dir = relativePath.parent_path(); auto file = boost::to_upper_copy(relativePath.filename().wstring()); relativePath = dir / file; - + auto str = relativePath.generic_wstring(); boost::trim_if(str, boost::is_any_of("\\/")); @@ -280,13 +280,18 @@ std::wstring ListTemplates(const spl::shared_ptr& cg return replyString.str(); } +std::vector> get_channels(const command_context& ctx) +{ + return cpplinq::from(ctx.channels) + .select([](channel_context c) { return spl::make_shared_ptr(c.channel); }) + .to_vector(); +} + core::frame_producer_dependencies get_producer_dependencies(const std::shared_ptr& channel, const command_context& ctx) { return core::frame_producer_dependencies( channel->frame_factory(), - cpplinq::from(ctx.channels) - .select([](channel_context c) { return spl::make_shared_ptr(c.channel); }) - .to_vector(), + get_channels(ctx), channel->video_format_desc(), ctx.producer_registry); } @@ -616,6 +621,7 @@ void add_describer(core::help_sink& sink, const core::help_repository& repo) sink.example(L">> ADD 1 SCREEN"); sink.example(L">> ADD 1 AUDIO"); sink.example(L">> ADD 1 IMAGE filename"); + sink.example(L">> ADD 2 SYNCTO 1"); sink.example(L">> ADD 1 FILE filename.mov"); sink.example(L">> ADD 1 FILE filename.mov SEPARATE_KEY"); sink.example( @@ -635,7 +641,7 @@ std::wstring add_command(command_context& ctx) core::diagnostics::scoped_call_context save; core::diagnostics::call_context::for_thread().video_channel = ctx.channel_index + 1; - auto consumer = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage()); + auto consumer = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage(), get_channels(ctx)); ctx.channel.channel->output().add(ctx.layer_index(consumer->index()), consumer); return L"202 ADD OK\r\n"; @@ -660,7 +666,7 @@ void remove_describer(core::help_sink& sink, const core::help_repository& repo) std::wstring remove_command(command_context& ctx) { auto index = ctx.layer_index(std::numeric_limits::min()); - + if (index == std::numeric_limits::min()) { replace_placeholders( @@ -668,7 +674,7 @@ std::wstring remove_command(command_context& ctx) ctx.client->address(), ctx.parameters); - index = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage())->index(); + index = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage(), get_channels(ctx))->index(); } ctx.channel.channel->output().remove(index); @@ -689,7 +695,7 @@ void print_describer(core::help_sink& sink, const core::help_repository& repo) std::wstring print_command(command_context& ctx) { - ctx.channel.channel->output().add(ctx.consumer_registry->create_consumer({ L"IMAGE" }, &ctx.channel.channel->stage())); + ctx.channel.channel->output().add(ctx.consumer_registry->create_consumer({ L"IMAGE" }, &ctx.channel.channel->stage(), get_channels(ctx))); return L"202 PRINT OK\r\n"; } @@ -2045,7 +2051,7 @@ std::wstring channel_grid_command(command_context& ctx) params.push_back(L"0"); params.push_back(L"NAME"); params.push_back(L"Channel Grid Window"); - auto screen = ctx.consumer_registry->create_consumer(params, &self.channel->stage()); + auto screen = ctx.consumer_registry->create_consumer(params, &self.channel->stage(), get_channels(ctx)); self.channel->output().add(screen); diff --git a/shell/casparcg.config b/shell/casparcg.config index d1358aab2..9357721fe 100644 --- a/shell/casparcg.config +++ b/shell/casparcg.config @@ -116,6 +116,9 @@ udp://localhost:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast + + 1 + diff --git a/shell/server.cpp b/shell/server.cpp index cee6709c7..3d78ed36b 100644 --- a/shell/server.cpp +++ b/shell/server.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -168,6 +169,7 @@ struct server::impl : boost::noncopyable initialize_modules(dependencies); core::text::init(dependencies); core::scene::init(dependencies); + core::syncto::init(dependencies); help_repo_->register_item({ L"producer" }, L"Color Producer", &core::describe_color_producer); } @@ -250,8 +252,12 @@ struct server::impl : boost::noncopyable void setup_channels(const boost::property_tree::wptree& pt) { using boost::property_tree::wptree; + + std::vector xml_channels; + for (auto& xml_channel : pt | witerate_children(L"configuration.channels") | welement_context_iteration) { + xml_channels.push_back(xml_channel.second); ptree_verify_element_name(xml_channel, L"channel"); auto format_desc_str = xml_channel.second.get(L"video-mode", L"PAL"); @@ -267,17 +273,24 @@ struct server::impl : boost::noncopyable auto channel_id = static_cast(channels_.size() + 1); auto channel = spl::make_shared(channel_id, format_desc, *channel_layout, accelerator_.create_image_mixer(channel_id)); + channel->monitor_output().attach_parent(monitor_subject_); + channel->mixer().set_straight_alpha_output(xml_channel.second.get(L"straight-alpha-output", false)); + channels_.push_back(channel); + } + + for (auto& channel : channels_) + { core::diagnostics::scoped_call_context save; core::diagnostics::call_context::for_thread().video_channel = channel->index(); - for (auto& xml_consumer : xml_channel.second | witerate_children(L"consumers") | welement_context_iteration) + for (auto& xml_consumer : xml_channels.at(channel->index() - 1) | witerate_children(L"consumers") | welement_context_iteration) { auto name = xml_consumer.first; try { if (name != L"") - channel->output().add(consumer_registry_->create_consumer(name, xml_consumer.second, &channel->stage())); + channel->output().add(consumer_registry_->create_consumer(name, xml_consumer.second, &channel->stage(), channels_)); } catch (const user_error& e) { @@ -289,10 +302,6 @@ struct server::impl : boost::noncopyable CASPAR_LOG_CURRENT_EXCEPTION(); } } - - channel->monitor_output().attach_parent(monitor_subject_); - channel->mixer().set_straight_alpha_output(xml_channel.second.get(L"straight-alpha-output", false)); - channels_.push_back(channel); } // Dummy diagnostics channel -- 2.39.2