]> git.sesse.net Git - casparcg/commitdiff
Created a consumer that provides sync to a channel based on the pace of another chann...
authorHelge Norberg <helge.norberg@svt.se>
Tue, 25 Oct 2016 14:44:21 +0000 (16:44 +0200)
committerHelge Norberg <helge.norberg@svt.se>
Tue, 25 Oct 2016 14:44:21 +0000 (16:44 +0200)
31 files changed:
common/semaphore.h
core/CMakeLists.txt
core/consumer/frame_consumer.cpp
core/consumer/frame_consumer.h
core/consumer/output.cpp
core/consumer/output.h
core/consumer/port.cpp
core/consumer/port.h
core/consumer/syncto/syncto_consumer.cpp [new file with mode: 0644]
core/consumer/syncto/syncto_consumer.h [new file with mode: 0644]
core/video_channel.cpp
core/video_channel.h
modules/bluefish/consumer/bluefish_consumer.cpp
modules/bluefish/consumer/bluefish_consumer.h
modules/decklink/consumer/decklink_consumer.cpp
modules/decklink/consumer/decklink_consumer.h
modules/ffmpeg/consumer/ffmpeg_consumer.cpp
modules/ffmpeg/consumer/ffmpeg_consumer.h
modules/ffmpeg/consumer/streaming_consumer.cpp
modules/ffmpeg/consumer/streaming_consumer.h
modules/image/consumer/image_consumer.cpp
modules/image/consumer/image_consumer.h
modules/newtek/consumer/newtek_ivga_consumer.cpp
modules/newtek/consumer/newtek_ivga_consumer.h
modules/oal/consumer/oal_consumer.cpp
modules/oal/consumer/oal_consumer.h
modules/screen/consumer/screen_consumer.cpp
modules/screen/consumer/screen_consumer.h
protocol/amcp/AMCPCommandsImpl.cpp
shell/casparcg.config
shell/server.cpp

index 04e36426045c48c9603807a9c7f0a90d1806c9f8..7baccda0e52df188f8e782e135406b8166a8488d 100644 (file)
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 
+#include <map>
+#include <queue>
+#include <functional>
+
 namespace caspar {
 
 template <class N, class Func>
@@ -43,9 +47,10 @@ void repeat_n(N times_to_repeat_block, const Func& func)
  */
 class semaphore : boost::noncopyable
 {
-       mutable boost::mutex mutex_;
-       unsigned int permits_;
-       boost::condition_variable_any permits_available_;
+       mutable boost::mutex                                                                            mutex_;
+       unsigned int                                                                                            permits_;
+       boost::condition_variable_any                                                           permits_available_;
+       std::map<unsigned int, std::queue<std::function<void()>>>       callbacks_per_requested_permits_;
 public:
        /**
         * Constructor.
@@ -66,6 +71,7 @@ public:
 
                ++permits_;
 
+               perform_callback_based_acquire();
                permits_available_.notify_one();
        }
 
@@ -80,6 +86,7 @@ public:
 
                permits_ += permits;
 
+               perform_callback_based_acquire();
                repeat_n(permits, [this] { permits_available_.notify_one(); });
        }
 
@@ -112,11 +119,11 @@ public:
 
                while (true)
                {
-                       auto num_wanted = permits - num_acquired;
-                       auto to_drain = std::min(num_wanted, permits_);
+                       auto num_wanted = permits - num_acquired;
+                       auto to_drain   = std::min(num_wanted, permits_);
 
-                       permits_ -= to_drain;
-                       num_acquired += to_drain;
+                       permits_                -= to_drain;
+                       num_acquired    += to_drain;
 
                        if (num_acquired == permits)
                                break;
@@ -125,6 +132,20 @@ public:
                }
        }
 
+       /**
+       * Acquire a number of permits. Will not block, but instead invoke a callback
+       * when the specified number of permits are available and has been acquired.
+       *
+       * @param permits           The number of permits to acquire.
+       * @param acquired_callback The callback to invoke when acquired.
+       */
+       void acquire(unsigned int permits, std::function<void()> acquired_callback)
+       {
+               boost::unique_lock<boost::mutex> lock(mutex_);
+
+               callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
+       }
+
        /**
         * Acquire a number of permits. Will block until the given number of
         * permits has been acquired if not enough permits are currently available
@@ -143,11 +164,11 @@ public:
 
                while (true)
                {
-                       auto num_wanted = permits - num_acquired;
-                       auto to_drain = std::min(num_wanted, permits_);
+                       auto num_wanted = permits - num_acquired;
+                       auto to_drain   = std::min(num_wanted, permits_);
 
-                       permits_ -= to_drain;
-                       num_acquired += to_drain;
+                       permits_                -= to_drain;
+                       num_acquired    += to_drain;
 
                        if (num_acquired == permits)
                                break;
@@ -194,6 +215,47 @@ public:
 
                return permits_;
        }
+
+private:
+       void perform_callback_based_acquire()
+       {
+               if (callbacks_per_requested_permits_.empty())
+                       return;
+
+               while (
+                       !callbacks_per_requested_permits_.empty() &&
+                       callbacks_per_requested_permits_.begin()->first <= permits_)
+               {
+                       auto requested_permits_and_callbacks    = callbacks_per_requested_permits_.begin();
+                       auto requested_permits                                  = requested_permits_and_callbacks->first;
+                       auto& callbacks                                                 = requested_permits_and_callbacks->second;
+
+                       if (callbacks.empty())
+                       {
+                               callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+                               continue;
+                       }
+
+                       auto& callback                                                  = callbacks.front();
+
+                       permits_ -= requested_permits;
+                       mutex_.unlock();
+
+                       try
+                       {
+                               callback();
+                       }
+                       catch (...)
+                       {
+                       }
+
+                       mutex_.lock();
+                       callbacks.pop();
+
+                       if (callbacks.empty())
+                               callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+               }
+       }
 };
 
 /**
index aaa67b0e12840c54bfe2d058643630402722564c..21ef8a9bdef15a137c68de1968d2efff4c524801 100644 (file)
@@ -2,6 +2,8 @@ cmake_minimum_required (VERSION 2.6)
 project (core)
 
 set(SOURCES
+               consumer/syncto/syncto_consumer.cpp
+
                consumer/frame_consumer.cpp
                consumer/output.cpp
                consumer/port.cpp
@@ -58,6 +60,8 @@ set(SOURCES
                video_format.cpp
 )
 set(HEADERS
+               consumer/syncto/syncto_consumer.h
+
                consumer/frame_consumer.h
                consumer/output.h
                consumer/port.h
@@ -149,6 +153,7 @@ include_directories(${GLEW_INCLUDE_PATH})
 
 source_group(sources ./*)
 source_group(sources\\consumer consumer/*)
+source_group(sources\\consumer\\syncto consumer/syncto/*)
 source_group(sources\\diagnostics diagnostics/*)
 source_group(sources\\producer producer/*)
 source_group(sources\\producer\\framerate producer/framerate/*)
index 10fccad3e157f4ee1eae8766eae0359dd31b3366..477998ace7cc2c9a2cb20d80e835e12c41642925 100644 (file)
@@ -82,27 +82,27 @@ void destroy_consumers_synchronously()
 }
 
 class destroy_consumer_proxy : public frame_consumer
-{      
+{
        std::shared_ptr<frame_consumer> consumer_;
 public:
-       destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) 
+       destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer)
                : consumer_(std::move(consumer))
        {
                destroy_consumers_in_separate_thread() = true;
        }
 
        ~destroy_consumer_proxy()
-       {               
+       {
                static tbb::atomic<int> counter;
                static std::once_flag counter_init_once;
                std::call_once(counter_init_once, []{ counter = 0; });
 
                if (!destroy_consumers_in_separate_thread())
                        return;
-                       
+
                ++counter;
                CASPAR_VERIFY(counter < 8);
-               
+
                auto consumer = new std::shared_ptr<frame_consumer>(std::move(consumer_));
                boost::thread([=]
                {
@@ -122,38 +122,39 @@ public:
 
                        pointer_guard.reset();
 
-               }).detach(); 
+               }).detach();
        }
-       
+
        std::future<bool> send(const_frame frame) override                                                                                                                                                              {return consumer_->send(std::move(frame));}
        void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override   {return consumer_->initialize(format_desc, channel_layout, channel_index);}
-       std::wstring print() const override                                                                                                                                                                                             {return consumer_->print();}    
+       std::wstring print() const override                                                                                                                                                                                             {return consumer_->print();}
        std::wstring name() const override                                                                                                                                                                                              {return consumer_->name();}
        boost::property_tree::wptree info() const override                                                                                                                                                              {return consumer_->info();}
        bool has_synchronization_clock() const override                                                                                                                                                                 {return consumer_->has_synchronization_clock();}
        int buffer_depth() const override                                                                                                                                                                                               {return consumer_->buffer_depth();}
        int index() const override                                                                                                                                                                                                              {return consumer_->index();}
        int64_t presentation_frame_age_millis() const override                                                                                                                                                  {return consumer_->presentation_frame_age_millis();}
-       monitor::subject& monitor_output() override                                                                                                                                                                             {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                                                                                                                                             {return consumer_->monitor_output();}
+       const frame_consumer* unwrapped() const override                                                                                                                                                                {return consumer_->unwrapped();}
 };
 
 class print_consumer_proxy : public frame_consumer
-{      
+{
        std::shared_ptr<frame_consumer> consumer_;
 public:
-       print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) 
+       print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer)
                : consumer_(std::move(consumer))
        {
        }
 
        ~print_consumer_proxy()
-       {               
+       {
                auto str = consumer_->print();
                CASPAR_LOG(debug) << str << L" Uninitializing.";
                consumer_.reset();
                CASPAR_LOG(info) << str << L" Uninitialized.";
        }
-       
+
        std::future<bool> send(const_frame frame) override                                                                                                                                                              {return consumer_->send(std::move(frame));}
        void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override
        {
@@ -167,22 +168,23 @@ public:
        int buffer_depth() const override                                                                                                                                                                                               {return consumer_->buffer_depth();}
        int index() const override                                                                                                                                                                                                              {return consumer_->index();}
        int64_t presentation_frame_age_millis() const override                                                                                                                                                  {return consumer_->presentation_frame_age_millis();}
-       monitor::subject& monitor_output() override                                                                                                                                                                             {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                                                                                                                                             {return consumer_->monitor_output();}
+       const frame_consumer* unwrapped() const override                                                                                                                                                                {return consumer_->unwrapped();}
 };
 
 class recover_consumer_proxy : public frame_consumer
-{      
+{
        std::shared_ptr<frame_consumer> consumer_;
        int                                                             channel_index_  = -1;
        video_format_desc                               format_desc_;
        audio_channel_layout                    channel_layout_ = audio_channel_layout::invalid();
 public:
-       recover_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) 
+       recover_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer)
                : consumer_(std::move(consumer))
        {
        }
-       
-       std::future<bool> send(const_frame frame) override                              
+
+       std::future<bool> send(const_frame frame) override
        {
                try
                {
@@ -220,7 +222,8 @@ public:
        int buffer_depth() const override                                                                               {return consumer_->buffer_depth();}
        int index() const override                                                                                              {return consumer_->index();}
        int64_t presentation_frame_age_millis() const override                                  {return consumer_->presentation_frame_age_millis();}
-       monitor::subject& monitor_output() override                                                             {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                             {return consumer_->monitor_output();}
+       const frame_consumer* unwrapped() const override                                                {return consumer_->unwrapped();}
 };
 
 // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.
@@ -236,7 +239,7 @@ public:
                : consumer_(consumer)
        {
        }
-       
+
        void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override
        {
                audio_cadence_  = format_desc.audio_cadence;
@@ -247,14 +250,14 @@ public:
        }
 
        std::future<bool> send(const_frame frame) override
-       {               
+       {
                if(audio_cadence_.size() == 1)
                        return consumer_->send(frame);
 
                std::future<bool> result = make_ready_future(true);
-               
+
                if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() * channel_layout_.num_channels == static_cast<int>(frame.audio_data().size()))
-               {       
+               {
                        // Audio sent so far is in sync, now we can send the next chunk.
                        result = consumer_->send(frame);
                        boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);
@@ -263,10 +266,10 @@ public:
                        CASPAR_LOG(trace) << print() << L" Syncing audio.";
 
                sync_buffer_.push_back(static_cast<int>(frame.audio_data().size() / channel_layout_.num_channels));
-               
+
                return std::move(result);
        }
-       
+
        std::wstring print() const override                                                                             {return consumer_->print();}
        std::wstring name() const override                                                                              {return consumer_->name();}
        boost::property_tree::wptree info() const override                                              {return consumer_->info();}
@@ -274,22 +277,23 @@ public:
        int buffer_depth() const override                                                                               {return consumer_->buffer_depth();}
        int index() const override                                                                                              {return consumer_->index();}
        int64_t presentation_frame_age_millis() const override                                  {return consumer_->presentation_frame_age_millis();}
-       monitor::subject& monitor_output() override                                                             {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                             {return consumer_->monitor_output();}
+       const frame_consumer* unwrapped() const override                                                {return consumer_->unwrapped();}
 };
 
 spl::shared_ptr<core::frame_consumer> frame_consumer_registry::create_consumer(
-               const std::vector<std::wstring>& params, interaction_sink* sink) const
+               const std::vector<std::wstring>& params, interaction_sink* sink, std::vector<spl::shared_ptr<video_channel>> channels) const
 {
        if(params.empty())
                CASPAR_THROW_EXCEPTION(invalid_argument() << msg_info("params cannot be empty"));
-       
+
        auto consumer = frame_consumer::empty();
        auto& consumer_factories = impl_->consumer_factories;
        std::any_of(consumer_factories.begin(), consumer_factories.end(), [&](const consumer_factory_t& factory) -> bool
                {
                        try
                        {
-                               consumer = factory(params, sink);
+                               consumer = factory(params, sink, channels);
                        }
                        catch(...)
                        {
@@ -311,7 +315,8 @@ spl::shared_ptr<core::frame_consumer> frame_consumer_registry::create_consumer(
 spl::shared_ptr<frame_consumer> frame_consumer_registry::create_consumer(
                const std::wstring& element_name,
                const boost::property_tree::wptree& element,
-               interaction_sink* sink) const
+               interaction_sink* sink,
+               std::vector<spl::shared_ptr<video_channel>> channels) const
 {
        auto& preconfigured_consumer_factories = impl_->preconfigured_consumer_factories;
        auto found = preconfigured_consumer_factories.find(element_name);
@@ -324,7 +329,7 @@ spl::shared_ptr<frame_consumer> frame_consumer_registry::create_consumer(
                        spl::make_shared<print_consumer_proxy>(
                                        spl::make_shared<recover_consumer_proxy>(
                                                        spl::make_shared<cadence_guard>(
-                                                                       found->second(element, sink)))));
+                                                                       found->second(element, sink, channels)))));
 }
 
 const spl::shared_ptr<frame_consumer>& frame_consumer::empty()
index e09e4edcfe263f8a36c5a22359b831fe3fab445b..58ffeb61b62168581d4f2d91c4f2ea5d068c9897 100644 (file)
@@ -44,19 +44,19 @@ class frame_consumer
 public:
 
        // Static Members
-       
+
        static const spl::shared_ptr<frame_consumer>& empty();
 
        // Constructors
 
        frame_consumer(){}
        virtual ~frame_consumer() {}
-       
+
        // Methods
 
        virtual std::future<bool>                               send(const_frame frame) = 0;
        virtual void                                                    initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) = 0;
-       
+
        // monitor::observable
 
        virtual monitor::subject& monitor_output() = 0;
@@ -70,14 +70,17 @@ public:
        virtual int                                                             buffer_depth() const = 0; // -1 to not participate in frame presentation synchronization
        virtual int                                                             index() const = 0;
        virtual int64_t                                                 presentation_frame_age_millis() const = 0;
+       virtual const frame_consumer*                   unwrapped() const { return this; }
 };
 
 typedef std::function<spl::shared_ptr<frame_consumer>(
                const std::vector<std::wstring>&,
-               interaction_sink* sink)> consumer_factory_t;
+               interaction_sink* sink,
+               std::vector<spl::shared_ptr<video_channel>> channels)> consumer_factory_t;
 typedef std::function<spl::shared_ptr<frame_consumer>(
                const boost::property_tree::wptree& element,
-               interaction_sink* sink)> preconfigured_consumer_factory_t;
+               interaction_sink* sink,
+               std::vector<spl::shared_ptr<video_channel>> channels)> preconfigured_consumer_factory_t;
 
 class frame_consumer_registry : boost::noncopyable
 {
@@ -89,11 +92,13 @@ public:
                        const preconfigured_consumer_factory_t& factory);
        spl::shared_ptr<frame_consumer> create_consumer(
                        const std::vector<std::wstring>& params,
-                       interaction_sink* sink) const;
+                       interaction_sink* sink,
+                       std::vector<spl::shared_ptr<video_channel>> channels) const;
        spl::shared_ptr<frame_consumer> create_consumer(
                        const std::wstring& element_name,
                        const boost::property_tree::wptree& element,
-                       interaction_sink* sink) const;
+                       interaction_sink* sink,
+                       std::vector<spl::shared_ptr<video_channel>> channels) const;
 private:
        struct impl;
        spl::shared_ptr<impl> impl_;
index 33cf01d7e5e98cb2800cc3a37c3c1e38b7e97d5c..d3c0580eb01bc624b18b396db5a8d9ec42a39206 100644 (file)
@@ -53,7 +53,7 @@
 namespace caspar { namespace core {
 
 struct output::impl
-{              
+{
        spl::shared_ptr<diagnostics::graph>     graph_;
        spl::shared_ptr<monitor::subject>       monitor_subject_                        = spl::make_shared<monitor::subject>("/output");
        const int                                                       channel_index_;
@@ -65,23 +65,23 @@ struct output::impl
        std::map<int, int64_t>                          send_to_consumers_delays_;
        executor                                                        executor_                                       { L"output " + boost::lexical_cast<std::wstring>(channel_index_) };
 public:
-       impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) 
+       impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index)
                : graph_(std::move(graph))
                , channel_index_(channel_index)
                , format_desc_(format_desc)
                , channel_layout_(channel_layout)
        {
                graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8f));
-       }       
-       
+       }
+
        void add(int index, spl::shared_ptr<frame_consumer> consumer)
-       {               
+       {
                remove(index);
 
                consumer->initialize(format_desc_, channel_layout_, channel_index_);
-               
+
                executor_.begin_invoke([this, index, consumer]
-               {                       
+               {
                        port p(index, channel_index_, std::move(consumer));
                        p.monitor_output().attach_parent(monitor_subject_);
                        ports_.insert(std::make_pair(index, std::move(p)));
@@ -94,7 +94,7 @@ public:
        }
 
        void remove(int index)
-       {               
+       {
                executor_.begin_invoke([=]
                {
                        auto it = ports_.find(index);
@@ -110,7 +110,7 @@ public:
        {
                remove(consumer->index());
        }
-       
+
        void change_channel_format(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout)
        {
                executor_.invoke([&]
@@ -120,7 +120,7 @@ public:
 
                        auto it = ports_.begin();
                        while(it != ports_.end())
-                       {                                               
+                       {
                                try
                                {
                                        it->second.change_channel_format(format_desc, channel_layout);
@@ -133,7 +133,7 @@ public:
                                        ports_.erase(it++);
                                }
                        }
-                       
+
                        format_desc_ = format_desc;
                        channel_layout_ = channel_layout;
                        frames_.clear();
@@ -141,7 +141,7 @@ public:
        }
 
        std::pair<int, int> minmax_buffer_depth() const
-       {               
+       {
                if(ports_.empty())
                        return std::make_pair(0, 0);
 
@@ -159,7 +159,7 @@ public:
                        .where(std::mem_fn(&port::has_synchronization_clock))
                        .any();
        }
-               
+
        std::future<void> operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout)
        {
                spl::shared_ptr<caspar::timer> frame_timer;
@@ -262,12 +262,12 @@ public:
        std::future<boost::property_tree::wptree> info()
        {
                return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
-               {                       
+               {
                        boost::property_tree::wptree info;
                        for (auto& port : ports_)
                        {
                                info.add_child(L"consumers.consumer", port.second.info())
-                                       .add(L"index", port.first); 
+                                       .add(L"index", port.first);
                        }
                        return info;
                }, task_priority::high_priority));
@@ -297,6 +297,19 @@ public:
                        return info;
                }, task_priority::high_priority));
        }
+
+       std::vector<spl::shared_ptr<const frame_consumer>> get_consumers()
+       {
+               return executor_.invoke([=]
+               {
+                       std::vector<spl::shared_ptr<const frame_consumer>> consumers;
+
+                       for (auto& port : ports_)
+                               consumers.push_back(port.second.consumer());
+
+                       return consumers;
+               });
+       }
 };
 
 output::output(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_layout, channel_index)){}
@@ -306,6 +319,7 @@ void output::remove(int index){impl_->remove(index);}
 void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
 std::future<boost::property_tree::wptree> output::info() const{return impl_->info();}
 std::future<boost::property_tree::wptree> output::delay_info() const{ return impl_->delay_info(); }
+std::vector<spl::shared_ptr<const frame_consumer>> output::get_consumers() const { return impl_->get_consumers(); }
 std::future<void> output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ return (*impl_)(std::move(frame), format_desc, channel_layout); }
 monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
 }}
index 85a925632ad778ca4d0c4becdb1ffd45a70ed103..bd75385c715a7119e2be6315e76f785b2f3c8351 100644 (file)
@@ -35,7 +35,7 @@
 FORWARD2(caspar, diagnostics, class graph);
 
 namespace caspar { namespace core {
-       
+
 class output final
 {
        output(const output&);
@@ -47,27 +47,28 @@ public:
        // Constructors
 
        explicit output(spl::shared_ptr<caspar::diagnostics::graph> graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index);
-       
+
        // Methods
 
        // Returns when submitted to consumers, but the future indicates when the consumers are ready for a new frame.
        std::future<void> operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout);
-       
+
        void add(const spl::shared_ptr<frame_consumer>& consumer);
        void add(int index, const spl::shared_ptr<frame_consumer>& consumer);
        void remove(const spl::shared_ptr<frame_consumer>& consumer);
        void remove(int index);
-       
+
        monitor::subject& monitor_output();
 
        // Properties
 
        std::future<boost::property_tree::wptree> info() const;
        std::future<boost::property_tree::wptree> delay_info() const;
+       std::vector<spl::shared_ptr<const frame_consumer>> get_consumers() const;
 
 private:
        struct impl;
        spl::shared_ptr<impl> impl_;
 };
 
-}}
\ No newline at end of file
+}}
index db3c68fa890b9ee28303eb68c4b6565ec73abf4b..ecc19836d07c7fb92e3e9052b2155eed2efc4794 100644 (file)
@@ -14,7 +14,7 @@ struct port::impl
 {
        int                                                                     index_;
        spl::shared_ptr<monitor::subject>       monitor_subject_ = spl::make_shared<monitor::subject>("/port/" + boost::lexical_cast<std::string>(index_));
-       std::shared_ptr<frame_consumer>         consumer_;
+       spl::shared_ptr<frame_consumer>         consumer_;
        int                                                                     channel_index_;
 public:
        impl(int index, int channel_index, spl::shared_ptr<frame_consumer> consumer)
@@ -24,12 +24,12 @@ public:
        {
                consumer_->monitor_output().attach_parent(monitor_subject_);
        }
-       
+
        void change_channel_format(const core::video_format_desc& format_desc, const audio_channel_layout& channel_layout)
        {
                consumer_->initialize(format_desc, channel_layout, channel_index_);
        }
-               
+
        std::future<bool> send(const_frame frame)
        {
                *monitor_subject_ << monitor::message("/type") % consumer_->name();
@@ -64,13 +64,18 @@ public:
        {
                return consumer_->presentation_frame_age_millis();
        }
+
+       spl::shared_ptr<const frame_consumer> consumer() const
+       {
+               return consumer_;
+       }
 };
 
 port::port(int index, int channel_index, spl::shared_ptr<frame_consumer> consumer) : impl_(new impl(index, channel_index, std::move(consumer))){}
 port::port(port&& other) : impl_(std::move(other.impl_)){}
 port::~port(){}
 port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;}
-std::future<bool> port::send(const_frame frame){return impl_->send(std::move(frame));} 
+std::future<bool> port::send(const_frame frame){return impl_->send(std::move(frame));}
 monitor::subject& port::monitor_output() {return *impl_->monitor_subject_;}
 void port::change_channel_format(const core::video_format_desc& format_desc, const audio_channel_layout& channel_layout){impl_->change_channel_format(format_desc, channel_layout);}
 int port::buffer_depth() const{return impl_->buffer_depth();}
@@ -78,4 +83,5 @@ std::wstring port::print() const{ return impl_->print();}
 bool port::has_synchronization_clock() const{return impl_->has_synchronization_clock();}
 boost::property_tree::wptree port::info() const{return impl_->info();}
 int64_t port::presentation_frame_age_millis() const{ return impl_->presentation_frame_age_millis(); }
-}}
\ No newline at end of file
+spl::shared_ptr<const frame_consumer> port::consumer() const { return impl_->consumer(); }
+}}
index 46db42c2da504b0f22349c055a445728ac2b17f8..48e639c9225f266eef71d665ea50007a017f29d7 100644 (file)
@@ -28,7 +28,7 @@ public:
 
        port& operator=(port&& other);
 
-       std::future<bool> send(const_frame frame);      
+       std::future<bool> send(const_frame frame);
 
        monitor::subject& monitor_output();
 
@@ -40,9 +40,10 @@ public:
        bool has_synchronization_clock() const;
        boost::property_tree::wptree info() const;
        int64_t presentation_frame_age_millis() const;
+       spl::shared_ptr<const frame_consumer> consumer() const;
 private:
        struct impl;
        std::unique_ptr<impl> impl_;
 };
 
-}}
\ No newline at end of file
+}}
diff --git a/core/consumer/syncto/syncto_consumer.cpp b/core/consumer/syncto/syncto_consumer.cpp
new file mode 100644 (file)
index 0000000..632bdb0
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../../StdAfx.h"
+
+#include "syncto_consumer.h"
+
+#include "../frame_consumer.h"
+#include "../../frame/frame.h"
+#include "../../help/help_sink.h"
+#include "../../module_dependencies.h"
+#include "../../monitor/monitor.h"
+#include "../../video_channel.h"
+#include "../output.h"
+
+#include <common/semaphore.h>
+
+#include <boost/lexical_cast.hpp>
+#include <boost/property_tree/ptree.hpp>
+
+#include <future>
+
+namespace caspar { namespace core { namespace syncto {
+
+void verify_cyclic_reference(int self_channel_index, const spl::shared_ptr<video_channel>& other_channel);
+
+class syncto_consumer : public frame_consumer
+{
+       monitor::subject                                monitor_subject_;
+       spl::shared_ptr<video_channel>  other_channel_;
+       semaphore                                               frames_to_render_       { 0 };
+       std::shared_ptr<void>                   tick_subscription_;
+       int                                                             self_channel_index_     = -1;
+public:
+       syncto_consumer(spl::shared_ptr<video_channel> other_channel)
+               : other_channel_(std::move(other_channel))
+       {
+       }
+
+       void initialize(const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) override
+       {
+               verify_cyclic_reference(channel_index, other_channel_);
+
+               self_channel_index_     = channel_index;
+               tick_subscription_      = other_channel_->add_tick_listener([=]
+               {
+                       frames_to_render_.release();
+               });
+       }
+
+       std::future<bool> send(const_frame frame) override
+       {
+               auto task = spl::make_shared<std::packaged_task<bool ()>>([=] { return true; });
+
+               frames_to_render_.acquire(1, [task]
+               {
+                       (*task)();
+               });
+
+               return task->get_future();
+       }
+
+       monitor::subject& monitor_output() override
+       {
+               return monitor_subject_;
+       }
+
+       std::wstring print() const override
+       {
+               if (self_channel_index_ != -1)
+                       return L"sync[" + boost::lexical_cast<std::wstring>(self_channel_index_) + L"]to[" + boost::lexical_cast<std::wstring>(other_channel_->index()) + L"]";
+               else
+                       return L"syncto[" + boost::lexical_cast<std::wstring>(other_channel_->index()) + L"]";
+       }
+
+       std::wstring name() const override
+       {
+               return L"syncto";
+       }
+
+       boost::property_tree::wptree info() const override
+       {
+               boost::property_tree::wptree info;
+               info.add(L"type", L"syncto-consumer");
+               info.add(L"channel-to-sync-to", other_channel_->index());
+               return info;
+       }
+
+       bool has_synchronization_clock() const override
+       {
+               return true;
+       }
+
+       int buffer_depth() const override
+       {
+               return -1;
+       }
+
+       int index() const override
+       {
+               return 70000;
+       }
+
+       int64_t presentation_frame_age_millis() const override
+       {
+               return 0;
+       }
+
+       spl::shared_ptr<video_channel> other_channel() const
+       {
+               return other_channel_;
+       }
+};
+
+void verify_cyclic_reference(int self_channel_index, const spl::shared_ptr<video_channel>& other_channel)
+{
+       if (self_channel_index == other_channel->index())
+               CASPAR_THROW_EXCEPTION(user_error() << msg_info(
+                               L"Cannot create syncto consumer where source channel and destination channel is the same or indirectly related"));
+
+       for (auto& consumer : other_channel->output().get_consumers())
+       {
+               auto raw_consumer       = consumer->unwrapped();
+               auto syncto                     = dynamic_cast<const syncto_consumer*>(raw_consumer);
+
+               if (syncto)
+                       verify_cyclic_reference(self_channel_index, syncto->other_channel());
+       }
+}
+
+void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
+{
+       sink.short_description(L"Lets a channel provide sync to another.");
+       sink.syntax(L"SYNCTO [other_channel:int]");
+       sink.para()->text(L"Provides sync to its own channel based on the rendering pace of the specified channel.");
+       sink.para()->text(L"Examples:");
+       sink.example(L">> ADD 1 SYNCTO 2");
+}
+
+spl::shared_ptr<core::frame_consumer> create_consumer(
+               const std::vector<std::wstring>& params,
+               core::interaction_sink*,
+               std::vector<spl::shared_ptr<video_channel>> channels)
+{
+       if (params.size() < 1 || !boost::iequals(params.at(0), L"SYNCTO"))
+               return core::frame_consumer::empty();
+
+       auto channel_id = boost::lexical_cast<int>(params.at(1));
+       auto channel    = channels.at(channel_id - 1);
+
+       return spl::make_shared<syncto_consumer>(channel);
+}
+
+spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
+               const boost::property_tree::wptree& ptree,
+               core::interaction_sink*,
+               std::vector<spl::shared_ptr<video_channel>> channels)
+{
+       auto channel_id = ptree.get<int>(L"channel-id");
+
+       return spl::make_shared<syncto_consumer>(channels.at(channel_id - 1));
+}
+
+void init(module_dependencies dependencies)
+{
+       dependencies.consumer_registry->register_consumer_factory(L"syncto", &create_consumer, &describe_consumer);
+       dependencies.consumer_registry->register_preconfigured_consumer_factory(L"syncto", &create_preconfigured_consumer);
+}
+
+}}}
diff --git a/core/consumer/syncto/syncto_consumer.h b/core/consumer/syncto/syncto_consumer.h
new file mode 100644 (file)
index 0000000..9eac2ea
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include "../../fwd.h"
+
+namespace caspar { namespace core { namespace syncto {
+
+void init(caspar::core::module_dependencies dependencies);
+
+}}}
index 8ee1992dae68186dcb434863fdb6f9dd36deb2a3..a0ac2c2012fd249ac76d0fcd85a8483826ee7e86 100644 (file)
@@ -49,6 +49,7 @@
 #include <boost/lexical_cast.hpp>
 
 #include <string>
+#include <unordered_map>
 
 namespace caspar { namespace core {
 
@@ -76,6 +77,10 @@ struct video_channel::impl final
        caspar::core::mixer                                                                     mixer_;
        caspar::core::stage                                                                     stage_;
 
+       mutable tbb::spin_mutex                                                         tick_listeners_mutex_;
+       int64_t                                                                                         last_tick_listener_id   = 0;
+       std::unordered_map<int64_t, std::function<void ()>>     tick_listeners_;
+
        executor                                                                                        executor_                               { L"video_channel " + boost::lexical_cast<std::wstring>(index_) };
 public:
        impl(
@@ -145,10 +150,28 @@ public:
                });
        }
 
+       void invoke_tick_listeners()
+       {
+               auto listeners = lock(tick_listeners_mutex_, [=] { return tick_listeners_; });
+
+               for (auto listener : listeners)
+               {
+                       try
+                       {
+                               listener.second();
+                       }
+                       catch (...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                       }
+               }
+       }
+
        void tick()
        {
                try
                {
+                       invoke_tick_listeners();
 
                        auto format_desc        = video_format_desc();
                        auto channel_layout = audio_channel_layout();
@@ -171,7 +194,7 @@ public:
                        auto frame_time = frame_timer.elapsed()*format_desc.fps*0.5;
                        graph_->set_value("tick-time", frame_time);
 
-                       *monitor_subject_       << monitor::message("/profiler/time")   % frame_timer.elapsed() % (1.0/format_desc_.fps)
+                       *monitor_subject_       << monitor::message("/profiler/time")   % frame_timer.elapsed() % (1.0/ video_format_desc().fps)
                                                                << monitor::message("/format")                  % format_desc.name;
                }
                catch(...)
@@ -225,6 +248,23 @@ public:
 
                return info;
        }
+
+       std::shared_ptr<void> add_tick_listener(std::function<void()> listener)
+       {
+               return lock(tick_listeners_mutex_, [&]
+               {
+                       auto tick_listener_id = last_tick_listener_id++;
+                       tick_listeners_.insert(std::make_pair(tick_listener_id, listener));
+
+                       return std::shared_ptr<void>(nullptr, [=](void*)
+                       {
+                               lock(tick_listeners_mutex_, [&]
+                               {
+                                       tick_listeners_.erase(tick_listener_id);
+                               });
+                       });
+               });
+       }
 };
 
 video_channel::video_channel(
@@ -248,5 +288,6 @@ boost::property_tree::wptree video_channel::info() const{return impl_->info();}
 boost::property_tree::wptree video_channel::delay_info() const { return impl_->delay_info(); }
 int video_channel::index() const { return impl_->index(); }
 monitor::subject& video_channel::monitor_output(){ return *impl_->monitor_subject_; }
+std::shared_ptr<void> video_channel::add_tick_listener(std::function<void()> listener) { return impl_->add_tick_listener(std::move(listener)); }
 
 }}
index b3ac548a7ff56b3107962e816ec4c4c6917d1b87..4c256b0dca24861fd28e9c6b147be78d0d3bc4df 100644 (file)
 
 #include <boost/property_tree/ptree_fwd.hpp>
 
+#include <functional>
+
 namespace caspar { namespace core {
-       
+
 class video_channel final
 {
        video_channel(const video_channel&);
@@ -50,7 +52,7 @@ public:
        ~video_channel();
 
        // Methods
-                       
+
        monitor::subject&                                               monitor_output();
 
        // Properties
@@ -67,6 +69,8 @@ public:
        core::audio_channel_layout                              audio_channel_layout() const;
        void                                                                    audio_channel_layout(const core::audio_channel_layout& channel_layout);
 
+       std::shared_ptr<void>                                   add_tick_listener(std::function<void()> listener);
+
        spl::shared_ptr<core::frame_factory>    frame_factory();
 
        boost::property_tree::wptree                    info() const;
@@ -77,4 +81,4 @@ private:
        spl::unique_ptr<impl> impl_;
 };
 
-}}
\ No newline at end of file
+}}
index 50529d9f10360d15ad3e52d0e352795424d59134..a51b9dc80bcd5f39b49fb472fa523bdda3633069 100644 (file)
@@ -460,7 +460,7 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
 }
 
 spl::shared_ptr<core::frame_consumer> create_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*)
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if(params.size() < 1 || !boost::iequals(params.at(0), L"BLUEFISH"))
                return core::frame_consumer::empty();
@@ -487,8 +487,8 @@ spl::shared_ptr<core::frame_consumer> create_consumer(
 }
 
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*)
-{      
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
+{
        const auto device_index         = ptree.get(                                            L"device",                      1);
        const auto embedded_audio       = ptree.get(                                            L"embedded-audio",      false);
        const auto key_only                     = ptree.get(                                            L"key-only",            false);
index 3c4ec6dbb9b61353f00e8da73190ed109bdb20fd..bcdc48bdd93f7809a872a1e351c27313a6c710b1 100644 (file)
@@ -33,8 +33,10 @@ namespace caspar { namespace bluefish {
 
 void describe_consumer(core::help_sink& sink, const core::help_repository& repo);
 spl::shared_ptr<core::frame_consumer> create_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*);
+               const std::vector<std::wstring>& params, core::interaction_sink*,
+               std::vector<spl::shared_ptr<core::video_channel>> channels);
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*);
+               const boost::property_tree::wptree& ptree, core::interaction_sink*,
+               std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index 3378068762d8e1fb74c73bb717816fb921bf9c80..d4da7d9da3736430b9efc73d3c1dbbf1f19fc09e 100644 (file)
@@ -817,7 +817,7 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
 }
 
 spl::shared_ptr<core::frame_consumer> create_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*)
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if (params.size() < 1 || !boost::iequals(params.at(0), L"DECKLINK"))
                return core::frame_consumer::empty();
@@ -863,7 +863,7 @@ spl::shared_ptr<core::frame_consumer> create_consumer(
 }
 
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*)
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        configuration config;
 
index 9c0de882f05e50bc7f911f0cfa6e20d81833163a..b06fe7f58577608b47cfa2a4142d02cd58ed249a 100644 (file)
@@ -34,8 +34,10 @@ namespace caspar { namespace decklink {
 
 void describe_consumer(core::help_sink& sink, const core::help_repository& repo);
 spl::shared_ptr<core::frame_consumer> create_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*);
+               const std::vector<std::wstring>& params, core::interaction_sink*,
+               std::vector<spl::shared_ptr<core::video_channel>> channels);
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*);
+               const boost::property_tree::wptree& ptree, core::interaction_sink*,
+               std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index b3006ab0a2ebcd6a1bb9450178c6800d90a49efa..20a30cd839cf7c76aac537cce9d4b041b9604b04 100644 (file)
@@ -885,7 +885,7 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
 }
 
 spl::shared_ptr<core::frame_consumer> create_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*)
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        auto params2 = params;
        auto separate_key_it = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY"));
@@ -925,7 +925,7 @@ spl::shared_ptr<core::frame_consumer> create_consumer(
 }
 
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*)
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        auto filename           = ptree_get<std::wstring>(ptree, L"path");
        auto codec                      = ptree.get(L"vcodec", L"libx264");
index ac27b94e915b1d846d431693e043e71098844aaa..7e62a4f095b1715d8e5bc001bfabbbbb0fbce6eb 100644 (file)
@@ -33,7 +33,9 @@
 namespace caspar { namespace ffmpeg {
 
 void describe_consumer(core::help_sink& sink, const core::help_repository& repo);
-spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params, core::interaction_sink*);
-spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*);
+spl::shared_ptr<core::frame_consumer> create_consumer(
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
+spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index 41cc3e79899fccc38679cffb8085f3caf384661a..4196f1e857cced2a7dd1c435f113697ea4b516d0 100644 (file)
@@ -1259,7 +1259,7 @@ void describe_streaming_consumer(core::help_sink& sink, const core::help_reposit
 }
 
 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*)
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
                return core::frame_consumer::empty();
@@ -1272,7 +1272,7 @@ spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
 }
 
 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*)
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        return spl::make_shared<streaming_consumer>(
                        u8(ptree_get<std::wstring>(ptree, L"path")),
index 2663cf4f152c9b6a310261e78d55b11d657544b8..7d7a40164e08bd79e10d25a913327ea53799e62e 100644 (file)
@@ -13,8 +13,8 @@ namespace caspar { namespace ffmpeg {
 
 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo);
 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*);
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
-               const boost::property_tree::wptree& ptree, core::interaction_sink*);
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index ab466f046ce6e804d9ba718db0f795da577a0a2e..4730402d7566fae1dba51a455bcf176811e4ff5a 100644 (file)
@@ -106,7 +106,7 @@ public:
                        try
                        {
                                auto filename2 = filename;
-                               
+
                                if (filename2.empty())
                                        filename2 = env::media_folder() +  boost::posix_time::to_iso_wstring(boost::posix_time::second_clock::local_time()) + L".png";
                                else
@@ -135,7 +135,7 @@ public:
        {
                return L"image[]";
        }
-       
+
        std::wstring name() const override
        {
                return L"image";
@@ -176,7 +176,8 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
        sink.example(L">> ADD 1 IMAGE", L"creating media/20130228T210946.png if the current time is 2013-02-28 21:09:46.");
 }
 
-spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params, core::interaction_sink*)
+spl::shared_ptr<core::frame_consumer> create_consumer(
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if (params.size() < 1 || !boost::iequals(params.at(0), L"IMAGE"))
                return core::frame_consumer::empty();
index fc47c62543d371783c5bd8293a4b08efb88ae4a5..111e319c1425006bee55bfba2322bb6bef0280b2 100644 (file)
 #include <string>
 #include <vector>
 
-namespace caspar { 
+namespace caspar {
 
 namespace image {
-       
+
 void write_cropped_png(
                const class core::const_frame& frame,
                const core::video_format_desc& format_desc,
@@ -43,6 +43,7 @@ void write_cropped_png(
                int height);
 
 void describe_consumer(core::help_sink& sink, const core::help_repository& repo);
-spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params, struct core::interaction_sink*);
+spl::shared_ptr<core::frame_consumer> create_consumer(
+               const std::vector<std::wstring>& params, struct core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 
 }}
index feade373c45b12e5f5f7e752cb8e012cecc9e9ff..f3d391795b6f733fae79d06ab79380f69abe2390 100644 (file)
@@ -18,7 +18,7 @@
 *
 * Author: Robert Nagy, ronag@live.com
 */
+
 #include "../StdAfx.h"
 
 #include "newtek_ivga_consumer.h"
@@ -76,13 +76,13 @@ public:
                graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
                diagnostics::register_graph(graph_);
        }
-       
+
        ~newtek_ivga_consumer()
        {
        }
 
        // frame_consumer
-       
+
        virtual void initialize(
                        const core::video_format_desc& format_desc,
                        const core::audio_channel_layout& channel_layout,
@@ -176,7 +176,7 @@ public:
        {
                return -1;
        }
-       
+
        virtual int index() const override
        {
                return 900;
@@ -191,7 +191,7 @@ public:
        {
                return false;
        }
-};     
+};
 
 void describe_ivga_consumer(core::help_sink& sink, const core::help_repository& repo)
 {
@@ -202,7 +202,7 @@ void describe_ivga_consumer(core::help_sink& sink, const core::help_repository&
        sink.example(L">> ADD 1 NEWTEK_IVGA");
 }
 
-spl::shared_ptr<core::frame_consumer> create_ivga_consumer(const std::vector<std::wstring>& params, core::interaction_sink*)
+spl::shared_ptr<core::frame_consumer> create_ivga_consumer(const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if (params.size() < 1 || !boost::iequals(params.at(0), L"NEWTEK_IVGA"))
                return core::frame_consumer::empty();
@@ -210,9 +210,9 @@ spl::shared_ptr<core::frame_consumer> create_ivga_consumer(const std::vector<std
        return spl::make_shared<newtek_ivga_consumer>();
 }
 
-spl::shared_ptr<core::frame_consumer> create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*)
-{      
+spl::shared_ptr<core::frame_consumer> create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
+{
        return spl::make_shared<newtek_ivga_consumer>();
 }
 
-}}
\ No newline at end of file
+}}
index fc41ca726c84e5bd3a7b913c314027f3e813b539..1fa43b7eec1333360d6a32cc409b656bc8059617 100644 (file)
@@ -32,7 +32,7 @@
 namespace caspar { namespace newtek {
 
 void describe_ivga_consumer(core::help_sink& sink, const core::help_repository& repo);
-spl::shared_ptr<core::frame_consumer> create_ivga_consumer(const std::vector<std::wstring>& params, core::interaction_sink*);
-spl::shared_ptr<core::frame_consumer> create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*);
+spl::shared_ptr<core::frame_consumer> create_ivga_consumer(const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
+spl::shared_ptr<core::frame_consumer> create_preconfigured_ivga_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index da6108c6917f497c499ca2c8bdb23301e1ba1023..286622f89076e5e031b0333a5fad82a3f3a2b022 100644 (file)
@@ -72,7 +72,7 @@ public:
 
                if(!context_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Failed to create audio context."));
-                       
+
                if(alcMakeContextCurrent(context_) == ALC_FALSE)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Failed to activate audio context."));
        }
@@ -98,7 +98,7 @@ void init_device()
 {
        static std::unique_ptr<device> instance;
        static boost::once_flag f = BOOST_ONCE_INIT;
-       
+
        boost::call_once(f, []{instance.reset(new device());});
 }
 
@@ -110,7 +110,7 @@ struct oal_consumer : public core::frame_consumer
        boost::timer                                                                    perf_timer_;
        tbb::atomic<int64_t>                                                    presentation_age_;
        int                                                                                             channel_index_          = -1;
-       
+
        core::video_format_desc                                                 format_desc_;
        core::audio_channel_layout                                              out_channel_layout_;
        std::unique_ptr<core::audio_channel_remapper>   channel_remapper_;
@@ -130,7 +130,7 @@ public:
 
                init_device();
 
-               graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));   
+               graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));
                graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
                graph_->set_color("late-frame", diagnostics::color(0.6f, 0.3f, 0.3f));
                diagnostics::register_graph(graph_);
@@ -139,7 +139,7 @@ public:
        ~oal_consumer()
        {
                executor_.invoke([=]
-               {               
+               {
                        if(source_)
                        {
                                alSourceStop(source_);
@@ -158,7 +158,7 @@ public:
 
        void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) override
        {
-               format_desc_    = format_desc;          
+               format_desc_    = format_desc;
                channel_index_  = channel_index;
                if (out_channel_layout_ == core::audio_channel_layout::invalid())
                        out_channel_layout_ = channel_layout.num_channels == 2 ? channel_layout : *core::audio_channel_layout_repository::get_default()->get_layout(L"stereo");
@@ -169,7 +169,7 @@ public:
                graph_->set_text(print());
 
                executor_.begin_invoke([=]
-               {               
+               {
                        buffers_.resize(format_desc_.fps > 30 ? 8 : 4);
                        alGenBuffers(static_cast<ALsizei>(buffers_.size()), buffers_.data());
                        alGenSources(1, &source_);
@@ -180,10 +180,10 @@ public:
                                alBufferData(buffers_[n], AL_FORMAT_STEREO16, audio.data(), static_cast<ALsizei>(audio.size()*sizeof(int16_t)), format_desc_.audio_sample_rate);
                                alSourceQueueBuffers(source_, 1, &buffers_[n]);
                        }
-                       
+
                        alSourcei(source_, AL_LOOPING, AL_FALSE);
 
-                       alSourcePlay(source_);  
+                       alSourcePlay(source_);
                });
        }
 
@@ -198,13 +198,13 @@ public:
                // exhausted, which should not happen
                executor_.begin_invoke([=]
                {
-                       ALenum state; 
+                       ALenum state;
                        alGetSourcei(source_, AL_SOURCE_STATE,&state);
                        if(state != AL_PLAYING)
                        {
                                for(int n = 0; n < buffers_.size()-1; ++n)
-                               {                                       
-                                       ALuint buffer = 0;  
+                               {
+                                       ALuint buffer = 0;
                                        alSourceUnqueueBuffers(source_, 1, &buffer);
                                        if(buffer)
                                        {
@@ -213,13 +213,13 @@ public:
                                                alSourceQueueBuffers(source_, 1, &buffer);
                                        }
                                }
-                               alSourcePlay(source_);          
+                               alSourcePlay(source_);
                                graph_->set_tag(diagnostics::tag_severity::WARNING, "late-frame");
                        }
 
                        auto audio = core::audio_32_to_16(channel_remapper_->mix_and_rearrange(frame.audio_data()));
-                       
-                       ALuint buffer = 0;  
+
+                       ALuint buffer = 0;
                        alSourceUnqueueBuffers(source_, 1, &buffer);
                        if(buffer)
                        {
@@ -229,14 +229,14 @@ public:
                        else
                                graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
 
-                       graph_->set_value("tick-time", perf_timer_.elapsed()*format_desc_.fps*0.5);             
+                       graph_->set_value("tick-time", perf_timer_.elapsed()*format_desc_.fps*0.5);
                        perf_timer_.restart();
                        presentation_age_ = frame.get_age_millis() + latency_millis();
                });
 
                return make_ready_future(true);
        }
-       
+
        std::wstring print() const override
        {
                return L"oal[" + boost::lexical_cast<std::wstring>(channel_index_) + L"|" + format_desc_.name + L"]";
@@ -253,7 +253,7 @@ public:
                info.add(L"type", L"system-audio");
                return info;
        }
-       
+
        bool has_synchronization_clock() const override
        {
                return false;
@@ -263,14 +263,14 @@ public:
        {
                return latency_millis_;
        }
-       
+
        int buffer_depth() const override
        {
                int delay_in_frames = static_cast<int>(latency_millis() / (1000.0 / format_desc_.fps));
-               
+
                return delay_in_frames;
        }
-               
+
        int index() const override
        {
                return 500;
@@ -293,7 +293,8 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
        sink.example(L">> ADD 1 AUDIO LATENCY 500", L"Specifies that the system-audio chain: openal => driver => sound card => speaker output is 500ms");
 }
 
-spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params, core::interaction_sink*)
+spl::shared_ptr<core::frame_consumer> create_consumer(
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if(params.size() < 1 || !boost::iequals(params.at(0), L"AUDIO"))
                return core::frame_consumer::empty();
@@ -316,7 +317,8 @@ spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wst
        return spl::make_shared<oal_consumer>(channel_layout, latency_millis);
 }
 
-spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink*)
+spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
+               const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        auto channel_layout                     = core::audio_channel_layout::invalid();
        auto channel_layout_spec        = ptree.get_optional<std::wstring>(L"channel-layout");
index abb7b7c32c00b5e79046ad4459a0c0381aaef980..a688d10948589e3efbdeca48118a223a084320f7 100644 (file)
 #include <boost/property_tree/ptree_fwd.hpp>
 
 namespace caspar { namespace oal {
-       
+
 void describe_consumer(core::help_sink& sink, const core::help_repository& repo);
 spl::shared_ptr<core::frame_consumer> create_consumer(
-               const std::vector<std::wstring>& params, core::interaction_sink*);
+               const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
-               const boost::property_tree::wptree&, core::interaction_sink*);
+               const boost::property_tree::wptree&, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index d40429d2492d0e6ab0ab3dcdfc23c87d31778ce2..b992cfb69f12773d79f1be9f959fcd14d1110201 100644 (file)
@@ -696,7 +696,8 @@ void describe_consumer(core::help_sink& sink, const core::help_repository& repo)
        sink.example(L">> ADD 1 SCREEN 1 BORDERLESS", L"opens a screen consumer without borders/window decorations on screen 1.");
 }
 
-spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params, core::interaction_sink* sink)
+spl::shared_ptr<core::frame_consumer> create_consumer(
+               const std::vector<std::wstring>& params, core::interaction_sink* sink, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        if (params.size() < 1 || !boost::iequals(params.at(0), L"SCREEN"))
                return core::frame_consumer::empty();
@@ -718,7 +719,8 @@ spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wst
        return spl::make_shared<screen_consumer_proxy>(config, sink);
 }
 
-spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(const boost::property_tree::wptree& ptree, core::interaction_sink* sink)
+spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
+               const boost::property_tree::wptree& ptree, core::interaction_sink* sink, std::vector<spl::shared_ptr<core::video_channel>> channels)
 {
        configuration config;
        config.name                             = ptree.get(L"name",                            config.name);
index f4293c1f3ef6c4fe224b57d4eaff7e85808b6f20..fb754db6e8287954e8839578a62e725bbf1e8112 100644 (file)
@@ -33,9 +33,11 @@ namespace caspar { namespace screen {
 void describe_consumer(core::help_sink& sink, const core::help_repository& repo);
 spl::shared_ptr<core::frame_consumer> create_consumer(
                const std::vector<std::wstring>& params,
-               core::interaction_sink* sink);
+               core::interaction_sink* sink,
+               std::vector<spl::shared_ptr<core::video_channel>> channels);
 spl::shared_ptr<core::frame_consumer> create_preconfigured_consumer(
                const boost::property_tree::wptree& ptree,
-               core::interaction_sink* sink);
+               core::interaction_sink* sink,
+               std::vector<spl::shared_ptr<core::video_channel>> channels);
 
-}}
\ No newline at end of file
+}}
index 710977ec666156b916071f7e65ccd74a8f44b4ad..2788325c7dda4b7ed16e0d8f500bff366d8e5556 100644 (file)
 /* Return codes
 
 102 [action]                   Information that [action] has happened
-101 [action]                   Information that [action] has happened plus one row of data  
+101 [action]                   Information that [action] has happened plus one row of data
 
 202 [command] OK               [command] has been executed
-201 [command] OK               [command] has been executed, plus one row of data  
+201 [command] OK               [command] has been executed, plus one row of data
 200 [command] OK               [command] has been executed, plus multiple lines of data. ends with an empty line
 
 400 ERROR                              the command could not be understood
 401 [command] ERROR            invalid/missing channel
 402 [command] ERROR            parameter missing
-403 [command] ERROR            invalid parameter  
+403 [command] ERROR            invalid parameter
 404 [command] ERROR            file not found
 
 500 FAILED                             internal error
@@ -134,7 +134,7 @@ std::wstring read_utf8_file(const boost::filesystem::path& file)
        std::wstringstream result;
        boost::filesystem::wifstream filestream(file);
 
-       if (filestream) 
+       if (filestream)
        {
                // Consume BOM first
                filestream.get();
@@ -234,11 +234,11 @@ std::wstring MediaInfo(const boost::filesystem::path& path, const spl::shared_pt
 }
 
 std::wstring ListMedia(const spl::shared_ptr<media_info_repository>& media_info_repo)
-{      
+{
        std::wstringstream replyString;
        for (boost::filesystem::recursive_directory_iterator itr(env::media_folder()), end; itr != end; ++itr)
                replyString << MediaInfo(itr->path(), media_info_repo);
-       
+
        return boost::to_upper_copy(replyString.str());
 }
 
@@ -247,7 +247,7 @@ std::wstring ListTemplates(const spl::shared_ptr<core::cg_producer_registry>& cg
        std::wstringstream replyString;
 
        for (boost::filesystem::recursive_directory_iterator itr(env::template_folder()), end; itr != end; ++itr)
-       {               
+       {
                if(boost::filesystem::is_regular_file(itr->path()) && cg_registry->is_cg_extension(itr->path().extension().wstring()))
                {
                        auto relativePath = get_relative_without_extension(itr->path(), env::template_folder());
@@ -264,7 +264,7 @@ std::wstring ListTemplates(const spl::shared_ptr<core::cg_producer_registry>& cg
                        auto dir = relativePath.parent_path();
                        auto file = boost::to_upper_copy(relativePath.filename().wstring());
                        relativePath = dir / file;
-                                               
+
                        auto str = relativePath.generic_wstring();
                        boost::trim_if(str, boost::is_any_of("\\/"));
 
@@ -280,13 +280,18 @@ std::wstring ListTemplates(const spl::shared_ptr<core::cg_producer_registry>& cg
        return replyString.str();
 }
 
+std::vector<spl::shared_ptr<core::video_channel>> get_channels(const command_context& ctx)
+{
+       return cpplinq::from(ctx.channels)
+               .select([](channel_context c) { return spl::make_shared_ptr(c.channel); })
+               .to_vector();
+}
+
 core::frame_producer_dependencies get_producer_dependencies(const std::shared_ptr<core::video_channel>& channel, const command_context& ctx)
 {
        return core::frame_producer_dependencies(
                        channel->frame_factory(),
-                       cpplinq::from(ctx.channels)
-                                       .select([](channel_context c) { return spl::make_shared_ptr(c.channel); })
-                                       .to_vector(),
+                       get_channels(ctx),
                        channel->video_format_desc(),
                        ctx.producer_registry);
 }
@@ -616,6 +621,7 @@ void add_describer(core::help_sink& sink, const core::help_repository& repo)
        sink.example(L">> ADD 1 SCREEN");
        sink.example(L">> ADD 1 AUDIO");
        sink.example(L">> ADD 1 IMAGE filename");
+       sink.example(L">> ADD 2 SYNCTO 1");
        sink.example(L">> ADD 1 FILE filename.mov");
        sink.example(L">> ADD 1 FILE filename.mov SEPARATE_KEY");
        sink.example(
@@ -635,7 +641,7 @@ std::wstring add_command(command_context& ctx)
        core::diagnostics::scoped_call_context save;
        core::diagnostics::call_context::for_thread().video_channel = ctx.channel_index + 1;
 
-       auto consumer = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage());
+       auto consumer = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage(), get_channels(ctx));
        ctx.channel.channel->output().add(ctx.layer_index(consumer->index()), consumer);
 
        return L"202 ADD OK\r\n";
@@ -660,7 +666,7 @@ void remove_describer(core::help_sink& sink, const core::help_repository& repo)
 std::wstring remove_command(command_context& ctx)
 {
        auto index = ctx.layer_index(std::numeric_limits<int>::min());
-       
+
        if (index == std::numeric_limits<int>::min())
        {
                replace_placeholders(
@@ -668,7 +674,7 @@ std::wstring remove_command(command_context& ctx)
                                ctx.client->address(),
                                ctx.parameters);
 
-               index = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage())->index();
+               index = ctx.consumer_registry->create_consumer(ctx.parameters, &ctx.channel.channel->stage(), get_channels(ctx))->index();
        }
 
        ctx.channel.channel->output().remove(index);
@@ -689,7 +695,7 @@ void print_describer(core::help_sink& sink, const core::help_repository& repo)
 
 std::wstring print_command(command_context& ctx)
 {
-       ctx.channel.channel->output().add(ctx.consumer_registry->create_consumer({ L"IMAGE" }, &ctx.channel.channel->stage()));
+       ctx.channel.channel->output().add(ctx.consumer_registry->create_consumer({ L"IMAGE" }, &ctx.channel.channel->stage(), get_channels(ctx)));
 
        return L"202 PRINT OK\r\n";
 }
@@ -2045,7 +2051,7 @@ std::wstring channel_grid_command(command_context& ctx)
        params.push_back(L"0");
        params.push_back(L"NAME");
        params.push_back(L"Channel Grid Window");
-       auto screen = ctx.consumer_registry->create_consumer(params, &self.channel->stage());
+       auto screen = ctx.consumer_registry->create_consumer(params, &self.channel->stage(), get_channels(ctx));
 
        self.channel->output().add(screen);
 
index d1358aab27819d36cfc4a2aab62b3982c9bc6847..9357721fe93315f5c19ee0a5f7483890b1399226 100644 (file)
                 <path>udp://localhost:9250</path>\r
                 <args>-format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast</args>\r
             </stream>\r
+            <syncto>\r
+                <channel-id>1</channel-id>\r
+            </syncto>\r
         </consumers>\r
     </channel>\r
 </channels>\r
index cee6709c766bb0b28f56df2cd818016ef2487137..3d78ed36b55bd61fe7576f6cf06413cf8302eeb5 100644 (file)
@@ -44,6 +44,7 @@
 #include <core/producer/text/text_producer.h>
 #include <core/producer/color/color_producer.h>
 #include <core/consumer/output.h>
+#include <core/consumer/syncto/syncto_consumer.h>
 #include <core/mixer/mixer.h>
 #include <core/mixer/image/image_mixer.h>
 #include <core/thumbnail_generator.h>
@@ -168,6 +169,7 @@ struct server::impl : boost::noncopyable
                initialize_modules(dependencies);
                core::text::init(dependencies);
                core::scene::init(dependencies);
+               core::syncto::init(dependencies);
                help_repo_->register_item({ L"producer" }, L"Color Producer", &core::describe_color_producer);
        }
 
@@ -250,8 +252,12 @@ struct server::impl : boost::noncopyable
        void setup_channels(const boost::property_tree::wptree& pt)
        {
                using boost::property_tree::wptree;
+
+               std::vector<wptree> xml_channels;
+
                for (auto& xml_channel : pt | witerate_children(L"configuration.channels") | welement_context_iteration)
                {
+                       xml_channels.push_back(xml_channel.second);
                        ptree_verify_element_name(xml_channel, L"channel");
 
                        auto format_desc_str = xml_channel.second.get(L"video-mode", L"PAL");
@@ -267,17 +273,24 @@ struct server::impl : boost::noncopyable
                        auto channel_id = static_cast<int>(channels_.size() + 1);
                        auto channel = spl::make_shared<video_channel>(channel_id, format_desc, *channel_layout, accelerator_.create_image_mixer(channel_id));
 
+                       channel->monitor_output().attach_parent(monitor_subject_);
+                       channel->mixer().set_straight_alpha_output(xml_channel.second.get(L"straight-alpha-output", false));
+                       channels_.push_back(channel);
+               }
+
+               for (auto& channel : channels_)
+               {
                        core::diagnostics::scoped_call_context save;
                        core::diagnostics::call_context::for_thread().video_channel = channel->index();
 
-                       for (auto& xml_consumer : xml_channel.second | witerate_children(L"consumers") | welement_context_iteration)
+                       for (auto& xml_consumer : xml_channels.at(channel->index() - 1) | witerate_children(L"consumers") | welement_context_iteration)
                        {
                                auto name = xml_consumer.first;
 
                                try
                                {
                                        if (name != L"<xmlcomment>")
-                                               channel->output().add(consumer_registry_->create_consumer(name, xml_consumer.second, &channel->stage()));
+                                               channel->output().add(consumer_registry_->create_consumer(name, xml_consumer.second, &channel->stage(), channels_));
                                }
                                catch (const user_error& e)
                                {
@@ -289,10 +302,6 @@ struct server::impl : boost::noncopyable
                                        CASPAR_LOG_CURRENT_EXCEPTION();
                                }
                        }
-
-                   channel->monitor_output().attach_parent(monitor_subject_);
-                       channel->mixer().set_straight_alpha_output(xml_channel.second.get(L"straight-alpha-output", false));
-                       channels_.push_back(channel);
                }
 
                // Dummy diagnostics channel