]> git.sesse.net Git - casparcg/commitdiff
Merged most recent OSC changes
authorHelge Norberg <helge.norberg@svt.se>
Fri, 25 Oct 2013 15:32:58 +0000 (17:32 +0200)
committerHelge Norberg <helge.norberg@svt.se>
Fri, 25 Oct 2013 15:32:58 +0000 (17:32 +0200)
58 files changed:
common/except.cpp
common/except.h
core/consumer/frame_consumer.cpp
core/consumer/frame_consumer.h
core/consumer/output.cpp
core/consumer/output.h
core/consumer/port.cpp
core/consumer/port.h
core/monitor/monitor.cpp
core/monitor/monitor.h
core/producer/color/color_producer.cpp
core/producer/draw/freehand_producer.cpp
core/producer/frame_producer.cpp
core/producer/frame_producer.h
core/producer/layer.cpp
core/producer/layer.h
core/producer/scene/const_producer.cpp
core/producer/scene/hotswap_producer.cpp
core/producer/scene/hotswap_producer.h
core/producer/scene/scene_producer.cpp
core/producer/scene/scene_producer.h
core/producer/separated/separated_producer.cpp
core/producer/stage.cpp
core/producer/stage.h
core/producer/text/text_producer.cpp
core/producer/text/text_producer.h
core/producer/transition/transition_producer.cpp
core/video_channel.cpp
core/video_channel.h
modules/bluefish/consumer/bluefish_consumer.cpp
modules/decklink/consumer/decklink_consumer.cpp
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/consumer/ffmpeg_consumer.cpp
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h
modules/flash/producer/cg_proxy.cpp
modules/flash/producer/cg_proxy.h
modules/flash/producer/flash_producer.cpp
modules/image/consumer/image_consumer.cpp
modules/image/producer/image_producer.cpp
modules/image/producer/image_scroll_producer.cpp
modules/oal/consumer/oal_consumer.cpp
modules/reroute/producer/reroute_producer.cpp
modules/screen/consumer/screen_consumer.cpp
protocol/asio/io_service_manager.cpp [new file with mode: 0644]
protocol/asio/io_service_manager.h [new file with mode: 0644]
protocol/osc/client.cpp [new file with mode: 0644]
protocol/osc/client.h [new file with mode: 0644]
protocol/osc/server.cpp [deleted file]
protocol/osc/server.h [deleted file]
protocol/protocol.vcxproj
protocol/protocol.vcxproj.filters
shell/casparcg.config
shell/server.cpp
shell/server.h

index 86cb8cb6e3877d3dd6095321ee8f23f8500abdf3..63ccf7a61de2556fd9f16ca8da18919e98e20bef 100644 (file)
@@ -2,17 +2,73 @@
 
 #include "except.h"
 
+#include <boost/thread.hpp>
+
 #include "os/windows/windows.h"
 
-namespace caspar {
+namespace caspar { namespace detail {
+
+typedef struct tagTHREADNAME_INFO
+{
+       DWORD dwType; // must be 0x1000
+       LPCSTR szName; // pointer to name (in user addr space)
+       DWORD dwThreadID; // thread ID (-1=caller thread)
+       DWORD dwFlags; // reserved for future use, must be zero
+} THREADNAME_INFO;
+
+inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
+{
+       THREADNAME_INFO info;
+       {
+               info.dwType = 0x1000;
+               info.szName = szThreadName;
+               info.dwThreadID = dwThreadID;
+               info.dwFlags = 0;
+       }
+       __try
+       {
+               RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(ULONG_PTR), (ULONG_PTR*)&info );
+       }
+       __except (EXCEPTION_CONTINUE_EXECUTION){}       
+}
+
+} // namespace detail
+
+bool& installed_for_thread()
+{
+       static boost::thread_specific_ptr<bool> installed;
+
+       auto for_thread = installed.get();
        
+       if (!for_thread)
+       {
+               for_thread = new bool(false);
+               installed.reset(for_thread);
+       }
+
+       return *for_thread;
+}
+
 void win32_exception::install_handler() 
 {
 //#ifndef _DEBUG
        _set_se_translator(win32_exception::Handler);
+       installed_for_thread() = true;
 //#endif
 }
 
+void win32_exception::ensure_handler_installed_for_thread(
+               const char* thread_description)
+{
+       if (!installed_for_thread())
+       {
+               install_handler();
+
+               if (thread_description)
+                       detail::SetThreadName(GetCurrentThreadId(), thread_description);
+       }
+}
+
 void win32_exception::Handler(unsigned int errorCode, EXCEPTION_POINTERS* pInfo) {
        switch(errorCode)
        {
index b0ce1c8e125c4196db62970129d0eef339d2fc5a..d964360e2457798c3f17bde9c08adbfd1acdaa9d 100644 (file)
@@ -93,6 +93,8 @@ class win32_exception : public std::exception
 public:
        typedef const void* address;
        static void install_handler();
+       static void ensure_handler_installed_for_thread(
+                       const char* thread_description = nullptr);
 
        address location() const { return location_; }
        unsigned int error_code() const { return errorCode_; }
index 634ae9ea29922be906b1a5d92b08d3287a0001dc..f0c474b472141147484a1b96423c9f61ff2dc846 100644 (file)
@@ -85,7 +85,7 @@ public:
        bool has_synchronization_clock() const override                                                                                                         {return consumer_->has_synchronization_clock();}
        int buffer_depth() const override                                                                                                                                       {return consumer_->buffer_depth();}
        int index() const override                                                                                                                                                      {return consumer_->index();}
-       monitor::source& monitor_output() override                                                                                                                      {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                                                                                     {return consumer_->monitor_output();}                                                                           
 };
 
 class print_consumer_proxy : public frame_consumer
@@ -114,7 +114,7 @@ public:
        bool has_synchronization_clock() const override                                                                                         {return consumer_->has_synchronization_clock();}
        int buffer_depth() const override                                                                                                                       {return consumer_->buffer_depth();}
        int index() const override                                                                                                                                      {return consumer_->index();}
-       monitor::source& monitor_output() override                                                                                                      {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                                                                     {return consumer_->monitor_output();}                                                                           
 };
 
 class recover_consumer_proxy : public frame_consumer
@@ -164,7 +164,7 @@ public:
        bool has_synchronization_clock() const override                                                 {return consumer_->has_synchronization_clock();}
        int buffer_depth() const override                                                                               {return consumer_->buffer_depth();}
        int index() const override                                                                                              {return consumer_->index();}
-       monitor::source& monitor_output() override                                                              {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                             {return consumer_->monitor_output();}                                                                           
 };
 
 // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.
@@ -215,7 +215,7 @@ public:
        bool has_synchronization_clock() const override                                                 {return consumer_->has_synchronization_clock();}
        int buffer_depth() const override                                                                               {return consumer_->buffer_depth();}
        int index() const override                                                                                              {return consumer_->index();}
-       monitor::source& monitor_output() override                                                              {return consumer_->monitor_output();}                                                                           
+       monitor::subject& monitor_output() override                                                             {return consumer_->monitor_output();}                                                                           
 };
 
 spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)
@@ -259,7 +259,7 @@ const spl::shared_ptr<frame_consumer>& frame_consumer::empty()
                bool has_synchronization_clock() const override {return false;}
                int buffer_depth() const override {return 0;};
                virtual int index() const{return -1;}
-               monitor::source& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}                                                                               
+               monitor::subject& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}                                                                              
                boost::property_tree::wptree info() const override
                {
                        boost::property_tree::wptree info;
index 8a078f89cd6555eab8c99048335a37796d2a6d14..6d4fae17a3eaa01e7e2176aaad09329edeaa7cfa 100644 (file)
@@ -57,7 +57,7 @@ public:
        
        // monitor::observable
 
-       virtual monitor::source& monitor_output() = 0;
+       virtual monitor::subject& monitor_output() = 0;
 
        // Properties
 
index e23908ca24f33df7f967dbf05afdbf0aaf0e89c0..a97e9918fb279f17523e69d51c14736a33150759 100644 (file)
@@ -53,7 +53,7 @@ namespace caspar { namespace core {
 struct output::impl
 {              
        spl::shared_ptr<diagnostics::graph>     graph_;
-       monitor::subject                                        monitor_subject_;
+       spl::shared_ptr<monitor::subject>       monitor_subject_;
        const int                                                       channel_index_;
        video_format_desc                                       format_desc_;
        std::map<int, port>                                     ports_; 
@@ -63,7 +63,7 @@ struct output::impl
 public:
        impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index) 
                : graph_(std::move(graph))
-               , monitor_subject_("/output")
+               , monitor_subject_(spl::make_shared<monitor::subject>("/output"))
                , channel_index_(channel_index)
                , format_desc_(format_desc)
                , executor_(L"output")
@@ -80,7 +80,7 @@ public:
                executor_.begin_invoke([this, index, consumer]
                {                       
                        port p(index, channel_index_, std::move(consumer));
-                       p.monitor_output().link_target(&monitor_subject_);
+                       p.monitor_output().attach_parent(monitor_subject_);
                        ports_.insert(std::make_pair(index, std::move(p)));
                }, task_priority::high_priority);
        }
@@ -253,5 +253,5 @@ void output::remove(int index){impl_->remove(index);}
 void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
 boost::unique_future<boost::property_tree::wptree> output::info() const{return impl_->info();}
 void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}
-monitor::source& output::monitor_output() {return impl_->monitor_subject_;}
+monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
 }}
\ No newline at end of file
index 5621bd6f2679f9d0325611a8140444aced81ddae..00e88c1c0f3f1e1cc69654ce2eefb211c415e9d7 100644 (file)
@@ -55,7 +55,7 @@ public:
        void remove(const spl::shared_ptr<class frame_consumer>& consumer);
        void remove(int index);
        
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        // Properties
 
index 9ed7aae68a8415f355fa43b0a4be846574d32002..c168c846573c2f8e0ad51f96ea73ad13e61b82ff 100644 (file)
@@ -10,18 +10,19 @@ namespace caspar { namespace core {
 
 struct port::impl
 {
-       monitor::subject                                        monitor_subject_;
+       spl::shared_ptr<monitor::subject>       monitor_subject_;
        std::shared_ptr<frame_consumer>         consumer_;
        int                                                                     index_;
        int                                                                     channel_index_;
 public:
        impl(int index, int channel_index, spl::shared_ptr<frame_consumer> consumer)
-               : monitor_subject_("/port" + boost::lexical_cast<std::string>(index))
+               : monitor_subject_(spl::make_shared<monitor::subject>(
+                               "/port" + boost::lexical_cast<std::string>(index)))
                , consumer_(std::move(consumer))
                , index_(index)
                , channel_index_(channel_index)
        {
-               consumer_->monitor_output().link_target(&monitor_subject_);
+               consumer_->monitor_output().attach_parent(monitor_subject_);
        }
        
        void video_format_desc(const struct video_format_desc& format_desc)
@@ -31,7 +32,7 @@ public:
                
        boost::unique_future<bool> send(const_frame frame)
        {
-               monitor_subject_ << monitor::message("/type") % consumer_->name();
+               *monitor_subject_ << monitor::message("/type") % consumer_->name();
                return consumer_->send(std::move(frame));
        }
        std::wstring print() const
@@ -65,7 +66,7 @@ port::port(port&& other) : impl_(std::move(other.impl_)){}
 port::~port(){}
 port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;}
 boost::unique_future<bool> port::send(const_frame frame){return impl_->send(std::move(frame));}        
-monitor::source& port::monitor_output() {return impl_->monitor_subject_;}
+monitor::subject& port::monitor_output() {return *impl_->monitor_subject_;}
 void port::video_format_desc(const struct video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
 int port::buffer_depth() const{return impl_->buffer_depth();}
 std::wstring port::print() const{ return impl_->print();}
index a8bddc5fcdf84e0a1811c3cc3cd5785443d3d094..f47b5377833315e0f733d4b758084c053bb6c02b 100644 (file)
@@ -29,7 +29,7 @@ public:
 
        boost::unique_future<bool> send(class const_frame frame);       
 
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        // Properties
 
index 0378be0d5e023c60c74eaf2aea5659f41934d63c..f2ad450ed78afc723b2fedc47742ec5dd8bf0881 100644 (file)
@@ -1,9 +1,57 @@
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
 #include "../StdAfx.h"
 
 #include "monitor.h"
 
-#include <utility>
+namespace caspar { namespace core { namespace monitor {
 
-namespace caspar { namespace monitor {
-       
-}}
\ No newline at end of file
+/*class in_callers_thread_schedule_group : public Concurrency::ScheduleGroup
+{
+       virtual void ScheduleTask(Concurrency::TaskProc proc, void* data) override
+       {
+               proc(data);
+       }
+
+    virtual unsigned int Id() const override
+       {
+               return 1;
+       }
+
+    virtual unsigned int Reference() override
+       {
+               return 1;
+       }
+
+    virtual unsigned int Release() override
+       {
+               return 1;
+       }
+};
+
+Concurrency::ScheduleGroup& get_in_callers_thread_schedule_group()
+{
+       static in_callers_thread_schedule_group group;
+
+       return group;
+}*/
+
+}}}
index d6f445f269ac884babf1f52f2abb5d3d30591c05..85c6c76f01708bc95ec3b506d7e843366a870dac 100644 (file)
@@ -1,3 +1,23 @@
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
 #pragma once
 
 #include <common/memory.h>
 #include <string>
 #include <vector>
 
-#include <agents.h>
-
-namespace caspar { namespace monitor {
-
+namespace caspar { namespace core { namespace monitor {
+               
 typedef boost::variant<bool, 
                                           std::int32_t, 
                                           std::int64_t, 
@@ -33,7 +51,7 @@ public:
        {
                CASPAR_ASSERT(path.empty() || path[0] == '/');
        }
-
+       
        message(std::string path, spl::shared_ptr<std::vector<data_t>> data_ptr)
                : path_(std::move(path))
                , data_ptr_(std::move(data_ptr))
@@ -68,27 +86,49 @@ private:
        spl::shared_ptr<std::vector<data_t>>    data_ptr_;
 };
 
-class subject : public Concurrency::transformer<monitor::message, monitor::message>
+struct sink
 {
+       virtual ~sink() { }
+
+       virtual void propagate(const message& msg) = 0;
+};
+
+class subject : public sink
+{
+private:
+       std::weak_ptr<sink> parent_;
+       const std::string path_;
 public:
        subject(std::string path = "")
-               : Concurrency::transformer<monitor::message, monitor::message>([=](const message& msg)
-               {
-                       return msg.propagate(path);
-               })
+               : path_(std::move(path))
        {
                CASPAR_ASSERT(path.empty() || path[0] == '/');
        }
 
-       template<typename T>
-       subject& operator<<(T&& msg)
+       void attach_parent(spl::shared_ptr<sink> parent)
        {
-               Concurrency::send(*this, std::forward<T>(msg));
+               parent_ = std::move(parent);
+       }
+
+       void detach_parent()
+       {
+               parent_.reset();
+       }
+
+       subject& operator<<(const message& msg)
+       {
+               propagate(msg);
+
                return *this;
        }
-};
 
-typedef Concurrency::ISource<monitor::message> source;
+       virtual void propagate(const message& msg) override
+       {
+               auto parent = parent_.lock();
 
+               if (parent)
+                       parent->propagate(msg.propagate(path_));
+       }
+};
 
-}}
\ No newline at end of file
+}}}
index 4d58ed838e4054536f2ccbdf6c917325bf455263..67161ef3fa3f7bdc35f7a3c9dc9e8c439890ee90 100644 (file)
@@ -97,7 +97,7 @@ public:
                return info;
        }
 
-       monitor::source& monitor_output() override {return monitor_subject_;}                                                                           
+       monitor::subject& monitor_output() override {return monitor_subject_;}                                                                          
 };
 
 std::wstring get_hex_color(const std::wstring& str)
index fc31f653982b7b3664b0d750070516d4705fc979..0bb97a8904e388964a0a0ce0cb896436073cd9b9 100644 (file)
@@ -162,7 +162,7 @@ public:
                return info;
        }
 
-       monitor::source& monitor_output()
+       monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index bfe9223fb866ca91272a58b2ac3e46c991efd6c2..472dfeed917a964bba6d29d4681e0c1fd71f1fda 100644 (file)
@@ -167,7 +167,7 @@ const spl::shared_ptr<frame_producer>& frame_producer::empty()
                void paused(bool value) override{}
                uint32_t nb_frames() const override {return 0;}
                std::wstring print() const override { return L"empty";}
-               monitor::source& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}                                                                               
+               monitor::subject& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}                                                                              
                std::wstring name() const override {return L"empty";}
                uint32_t frame_number() const override {return 0;}
                boost::unique_future<std::wstring> call(const std::vector<std::wstring>& params) override{CASPAR_THROW_EXCEPTION(not_supported());}
@@ -249,7 +249,7 @@ public:
        uint32_t                                                                                        nb_frames() const override                                                                                                              {return producer_->nb_frames();}
        class draw_frame                                                                        last_frame()                                                                                                                                    {return producer_->last_frame();}
        draw_frame                                                                                      create_thumbnail_frame()                                                                                                                {return producer_->create_thumbnail_frame();}
-       monitor::source&                                                                        monitor_output() override                                                                                                               {return producer_->monitor_output();}                                                                           
+       monitor::subject&                                                                       monitor_output() override                                                                                                               {return producer_->monitor_output();}                                                                           
        bool                                                                                            collides(double x, double y)                                                                                                    {return producer_->collides(x, y);}
        void                                                                                            on_interaction(const interaction_event::ptr& event)                                                             {return producer_->on_interaction(event);}
        constraints&                                                                            pixel_constraints() override                                                                                                    {return producer_->pixel_constraints();}
index 4ff54790e07620d5646b82bdca8198987513ff01..9af8b44e182d28e841a5a983dfbc918951d79fc0 100644 (file)
@@ -80,7 +80,7 @@ public:
        
        // monitor::observable
 
-       virtual monitor::source& monitor_output() = 0;
+       virtual monitor::subject& monitor_output() = 0;
 
        // interaction_sink
        virtual void on_interaction(const interaction_event::ptr& event) override { }
index 7bbdd056582264a2cd339ee7c42722cf44e16eb6..59ac66e9c62599f0be31986e307bb8ac74ed35a3 100644 (file)
@@ -36,18 +36,21 @@ namespace caspar { namespace core {
 
 struct layer::impl
 {                              
-       monitor::subject                                        monitor_subject_;
+       spl::shared_ptr<monitor::subject>       monitor_subject_;
        spl::shared_ptr<frame_producer>         foreground_;
        spl::shared_ptr<frame_producer>         background_;
        boost::optional<int32_t>                        auto_play_delta_;
+       bool                                                            is_paused_;
 
 public:
        impl(int index) 
-               : monitor_subject_("/layer/" + boost::lexical_cast<std::string>(index))
+               : monitor_subject_(spl::make_shared<monitor::subject>(
+                               "/layer/" + boost::lexical_cast<std::string>(index)))
 //             , foreground_event_subject_("")
 //             , background_event_subject_("background")
                , foreground_(frame_producer::empty())
                , background_(frame_producer::empty())
+               , is_paused_(false)
        {
 //             foreground_event_subject_.subscribe(event_subject_);
 //             background_event_subject_.subscribe(event_subject_);
@@ -55,14 +58,15 @@ public:
 
        void set_foreground(spl::shared_ptr<frame_producer> producer)
        {
-               foreground_->monitor_output().unlink_target(&monitor_subject_);
+               foreground_->monitor_output().detach_parent();
                foreground_ = std::move(producer);
-               foreground_->monitor_output().link_target(&monitor_subject_);
+               foreground_->monitor_output().attach_parent(monitor_subject_);
        }
 
        void pause()
        {
                foreground_->paused(true);
+               is_paused_ = true;
        }
        
        void load(spl::shared_ptr<frame_producer> producer, bool preview, const boost::optional<int32_t>& auto_play_delta)
@@ -77,6 +81,7 @@ public:
                {
                        play();
                        foreground_->paused(true);
+                       is_paused_ = true;
                }
 
                if(auto_play_delta_ && foreground_ == frame_producer::empty())
@@ -96,6 +101,7 @@ public:
                }
 
                foreground_->paused(false);
+               is_paused_ = false;
        }
        
        void stop()
@@ -109,6 +115,8 @@ public:
        {               
                try
                {               
+                       *monitor_subject_ << monitor::message("/paused") % is_paused_;
+
                        auto frame = foreground_->receive();
                        
                        if(frame == core::draw_frame::late())
@@ -187,7 +195,7 @@ draw_frame layer::receive(const video_format_desc& format_desc) {return impl_->r
 spl::shared_ptr<frame_producer> layer::foreground() const { return impl_->foreground_;}
 spl::shared_ptr<frame_producer> layer::background() const { return impl_->background_;}
 boost::property_tree::wptree layer::info() const{return impl_->info();}
-monitor::source& layer::monitor_output() {return impl_->monitor_subject_;}
+monitor::subject& layer::monitor_output() {return *impl_->monitor_subject_;}
 void layer::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); }
 bool layer::collides(double x, double y) const { return impl_->collides(x, y); }
 }}
\ No newline at end of file
index eb840782ca9206d322a4675c30ac9e984dde101b..c2d6a6997b6bc7a16f5046f98ae36ef9238b665a 100644 (file)
@@ -65,7 +65,7 @@ public:
        
        // monitor::observable
 
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        // interaction_sink
 
index b92bbe849f8092685dedad5cdb40ba52baaea677..2e46367c279a93233658e1bb9eabc7bdb98f71b5 100644 (file)
@@ -83,7 +83,7 @@ public:
                return info;
        }
 
-       monitor::source& monitor_output()
+       monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index 8c0edf4795f8bfef7fa5a48aa4db5ec395168ff4..e5dcb42b67366fa926d8ae60abd3dc781e7f1afc 100644 (file)
@@ -88,7 +88,7 @@ boost::property_tree::wptree hotswap_producer::info() const
        return info;
 }
 
-monitor::source& hotswap_producer::monitor_output()
+monitor::subject& hotswap_producer::monitor_output()
 {
        return impl_->monitor_subject;
 }
index 756e2dd4d5d8a364ecf32f473e3e076abae1db97..d148b9258ced4c116e9a38f6dc5438195cac2787 100644 (file)
@@ -41,7 +41,7 @@ public:
        std::wstring print() const override;
        std::wstring name() const override;
        boost::property_tree::wptree info() const override;
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        binding<std::shared_ptr<frame_producer>>& producer();
 private:
index f8eef59e66ecef42d8a7e2a705ad867d93e574ae..68565df8d58dbc85c42ae3877b8757e9d52642a9 100644 (file)
@@ -274,7 +274,7 @@ struct scene_producer::impl
                return info;
        }
 
-       monitor::source& monitor_output()
+       monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
@@ -343,7 +343,7 @@ boost::unique_future<std::wstring> scene_producer::call(const std::vector<std::w
        return impl_->call(params);
 }
 
-monitor::source& scene_producer::monitor_output()
+monitor::subject& scene_producer::monitor_output()
 {
        return impl_->monitor_output();
 }
index 32e8e2c3102575704c8fa91aed78663adffded1f..a34836bbef2aa777390d6b7aec8cefcd68becd2a 100644 (file)
@@ -94,7 +94,7 @@ public:
        std::wstring name() const override;
        boost::unique_future<std::wstring>      call(const std::vector<std::wstring>& params) override;
        boost::property_tree::wptree info() const override;
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        layer& create_layer(
                        const spl::shared_ptr<frame_producer>& producer, int x, int y, const std::wstring& name);
index a1acef3fa3a2567c9a28f824ebc98f301c50bab0..6138901b034164d77aed763885074bc7fe8480f9 100644 (file)
@@ -33,8 +33,8 @@ namespace caspar { namespace core {
 
 class separated_producer : public frame_producer_base
 {              
-       monitor::subject                                monitor_subject_;
-       monitor::subject                                key_monitor_subject_;
+       spl::shared_ptr<monitor::subject>       monitor_subject_;
+       spl::shared_ptr<monitor::subject>       key_monitor_subject_;
 
        spl::shared_ptr<frame_producer> fill_producer_;
        spl::shared_ptr<frame_producer> key_producer_;
@@ -43,8 +43,7 @@ class separated_producer : public frame_producer_base
                        
 public:
        explicit separated_producer(const spl::shared_ptr<frame_producer>& fill, const spl::shared_ptr<frame_producer>& key) 
-               : monitor_subject_("")
-               , key_monitor_subject_("/keyer")                
+               : key_monitor_subject_(spl::make_shared<monitor::subject>("/keyer"))
                , fill_producer_(fill)
                , key_producer_(key)
                , fill_(core::draw_frame::late())
@@ -52,10 +51,10 @@ public:
        {
                CASPAR_LOG(info) << print() << L" Initialized";
 
-               key_monitor_subject_.link_target(&monitor_subject_);
+               key_monitor_subject_->attach_parent(monitor_subject_);
 
-               key_producer_->monitor_output().link_target(&key_monitor_subject_);
-               fill_producer_->monitor_output().link_target(&monitor_subject_);
+               key_producer_->monitor_output().attach_parent(key_monitor_subject_);
+               fill_producer_->monitor_output().attach_parent(monitor_subject_);
        }
 
        // frame_producer
@@ -135,7 +134,7 @@ public:
                return fill_producer_->info();;
        }
 
-       monitor::source& monitor_output() { return monitor_subject_; }
+       monitor::subject& monitor_output() { return *monitor_subject_; }
 };
 
 spl::shared_ptr<frame_producer> create_separated_producer(const spl::shared_ptr<frame_producer>& fill, const spl::shared_ptr<frame_producer>& key)
index 938c5023553ff12d36939442c10059f9c2db6b5e..876d2b5534c7938812a4a1b6a3656cae0686ea72 100644 (file)
@@ -51,7 +51,7 @@ namespace caspar { namespace core {
 struct stage::impl : public std::enable_shared_from_this<impl>
 {                              
        spl::shared_ptr<diagnostics::graph>                                                     graph_;
-       monitor::subject                                                                                        monitor_subject_;
+       spl::shared_ptr<monitor::subject>                                                       monitor_subject_;
        //reactive::basic_subject<std::map<int, class draw_frame>>      frames_subject_;
        std::map<int, layer>                                                                            layers_;        
        std::map<int, tweened_transform>                                                        tweens_;
@@ -60,7 +60,7 @@ struct stage::impl : public std::enable_shared_from_this<impl>
 public:
        impl(spl::shared_ptr<diagnostics::graph> graph) 
                : graph_(std::move(graph))
-               , monitor_subject_("/stage")
+               , monitor_subject_(spl::make_shared<monitor::subject>("/stage"))
                , aggregator_([=] (double x, double y) { return collission_detect(x, y); })
                , executor_(L"stage")
        {
@@ -104,7 +104,7 @@ public:
                //frames_subject_ << frames;
                
                graph_->set_value("produce-time", frame_timer.elapsed()*format_desc.fps*0.5);
-               monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps);
+               *monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps);
 
                return frames;
        }
@@ -134,7 +134,7 @@ public:
                if(it == std::end(layers_))
                {
                        it = layers_.insert(std::make_pair(index, layer(index))).first;
-                       it->second.monitor_output().link_target(&monitor_subject_);
+                       it->second.monitor_output().attach_parent(monitor_subject_);
                }
                return it->second;
        }
@@ -239,18 +239,18 @@ public:
                        auto other_layers       = other_impl->layers_ | boost::adaptors::map_values;
 
                        BOOST_FOREACH(auto& layer, layers)
-                               layer.monitor_output().unlink_target(&monitor_subject_);
+                               layer.monitor_output().detach_parent();
                        
                        BOOST_FOREACH(auto& layer, other_layers)
-                               layer.monitor_output().unlink_target(&monitor_subject_);
+                               layer.monitor_output().detach_parent();
                        
                        std::swap(layers_, other_impl->layers_);
                                                
                        BOOST_FOREACH(auto& layer, layers)
-                               layer.monitor_output().link_target(&monitor_subject_);
+                               layer.monitor_output().attach_parent(monitor_subject_);
                        
                        BOOST_FOREACH(auto& layer, other_layers)
-                               layer.monitor_output().link_target(&monitor_subject_);
+                               layer.monitor_output().attach_parent(monitor_subject_);
                };              
 
                return executor_.begin_invoke([=]
@@ -280,13 +280,13 @@ public:
                                auto& my_layer          = get_layer(index);
                                auto& other_layer       = other_impl->get_layer(other_index);
 
-                               my_layer.monitor_output().unlink_target(&monitor_subject_);
-                               other_layer.monitor_output().unlink_target(&other_impl->monitor_subject_);
+                               my_layer.monitor_output().detach_parent();
+                               other_layer.monitor_output().detach_parent();
 
                                std::swap(my_layer, other_layer);
 
-                               my_layer.monitor_output().link_target(&monitor_subject_);
-                               other_layer.monitor_output().link_target(&other_impl->monitor_subject_);
+                               my_layer.monitor_output().attach_parent(monitor_subject_);
+                               other_layer.monitor_output().attach_parent(other_impl->monitor_subject_);
                        };              
 
                        return executor_.begin_invoke([=]
@@ -389,7 +389,7 @@ boost::unique_future<spl::shared_ptr<frame_producer>> stage::background(int inde
 boost::unique_future<boost::property_tree::wptree> stage::info() const{return impl_->info();}
 boost::unique_future<boost::property_tree::wptree> stage::info(int index) const{return impl_->info(index);}
 std::map<int, class draw_frame> stage::operator()(const video_format_desc& format_desc){return (*impl_)(format_desc);}
-monitor::source& stage::monitor_output(){return impl_->monitor_subject_;}
+monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;}
 //void stage::subscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.subscribe(o);}
 //void stage::unsubscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.unsubscribe(o);}
 void stage::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); }
index 8d308894b4cebc9d2f341f5199c6991ad57bfb6d..25a68a61675517520178c0fdeccf4487403cfbd6 100644 (file)
@@ -79,7 +79,7 @@ public:
        boost::unique_future<void>                      swap_layer(int index, int other_index);
        boost::unique_future<void>                      swap_layer(int index, int other_index, stage& other);   
 
-       monitor::source& monitor_output();      
+       monitor::subject& monitor_output();     
 
        // frame_observable
        //void subscribe(const frame_observable::observer_ptr& o) override;
index 1a08dcaeba6a7100fca9f878c8891b26f9e52f39..bbb798d2e78154a03e666d97de339902ba8a02ed 100644 (file)
@@ -309,7 +309,7 @@ constraints& text_producer::pixel_constraints() { return impl_->pixel_constraint
 std::wstring text_producer::print() const { return impl_->print(); }
 std::wstring text_producer::name() const { return impl_->name(); }
 boost::property_tree::wptree text_producer::info() const { return impl_->info(); }
-monitor::source& text_producer::monitor_output() { return impl_->monitor_subject_; }
+monitor::subject& text_producer::monitor_output() { return impl_->monitor_subject_; }
 binding<std::wstring>& text_producer::text() { return impl_->text(); }
 binding<double>& text_producer::tracking() { return impl_->tracking(); }
 const binding<double>& text_producer::current_bearing_y() const { return impl_->current_bearing_y(); }
index 4921792e54302b1dccf044ded9c9059cd661d1c4..f666f4ba73f1ba0f0509c0ffffc2dc2d46170029 100644 (file)
@@ -56,7 +56,7 @@ public:
        std::wstring print() const override;
        std::wstring name() const override;
        boost::property_tree::wptree info() const override;
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        binding<std::wstring>& text();
        binding<double>& tracking();
index 50a8761d28e6734299934eb952b2c80258d9bd78..c2f73306d02a181c46726c29f66a19b5156541d2 100644 (file)
@@ -34,7 +34,7 @@ namespace caspar { namespace core {
 
 class transition_producer : public frame_producer_base
 {      
-       monitor::subject                                        monitor_subject_;
+       spl::shared_ptr<monitor::subject>       monitor_subject_;
        const field_mode                                        mode_;
        int                                                                     current_frame_;
        
@@ -54,7 +54,7 @@ public:
                , source_producer_(frame_producer::empty())
                , paused_(false)
        {
-               dest->monitor_output().link_target(&monitor_subject_);
+               dest->monitor_output().attach_parent(monitor_subject_);
 
                CASPAR_LOG(info) << print() << L" Initialized";
        }
@@ -93,8 +93,8 @@ public:
                                source = source_producer_->last_frame();
                });                     
                                                
-               monitor_subject_        << monitor::message("/transition/frame") % current_frame_ % info_.duration
-                                               << monitor::message("/transition/type") % [&]() -> std::string
+               *monitor_subject_       << monitor::message("/transition/frame") % current_frame_ % info_.duration
+                                                       << monitor::message("/transition/type") % [&]() -> std::string
                                                                                                                                {
                                                                                                                                        switch(info_.type.value())
                                                                                                                                        {
@@ -207,9 +207,9 @@ public:
                return draw_frame::over(s_frame, d_frame);
        }
 
-       monitor::source& monitor_output()
+       monitor::subject& monitor_output()
        {
-               return monitor_subject_;
+               return *monitor_subject_;
        }
 
        void on_interaction(const interaction_event::ptr& event) override
index 38a46aa2350615229155212c56a74dab362e4aab..bb2dfe6cfbd67aaa38a8d07b77da532fa6049b23 100644 (file)
@@ -49,7 +49,7 @@ namespace caspar { namespace core {
 
 struct video_channel::impl /* final */
 {
-       monitor::subject                                                                monitor_subject_;
+       spl::shared_ptr<monitor::subject>                               monitor_subject_;
 
        const int                                                                               index_;
 
@@ -66,7 +66,8 @@ struct video_channel::impl /* final */
        executor                                                                                executor_;
 public:
        impl(int index, const core::video_format_desc& format_desc, std::unique_ptr<image_mixer> image_mixer)  
-               : monitor_subject_("/channel" + boost::lexical_cast<std::string>(index))
+               : monitor_subject_(spl::make_shared<monitor::subject>(
+                               "/channel/" + boost::lexical_cast<std::string>(index)))
                , index_(index)
                , format_desc_(format_desc)
                , output_(graph_, format_desc, index)
@@ -79,8 +80,8 @@ public:
                graph_->set_text(print());
                diagnostics::register_graph(graph_);
                
-               output_.monitor_output().link_target(&monitor_subject_);
-               stage_.monitor_output().link_target(&monitor_subject_);
+               output_.monitor_output().attach_parent(monitor_subject_);
+               stage_.monitor_output().attach_parent(monitor_subject_);
 
                executor_.begin_invoke([=]{tick();});
 
@@ -127,7 +128,7 @@ public:
                
                        graph_->set_value("tick-time", frame_timer.elapsed()*format_desc.fps*0.5);
 
-                       monitor_subject_        << monitor::message("/profiler/time")   % frame_timer.elapsed() % (1.0/format_desc_.fps)
+                       *monitor_subject_       << monitor::message("/profiler/time")   % frame_timer.elapsed() % (1.0/format_desc_.fps)
                                                                << monitor::message("/format")                  % format_desc.name;
                }
                catch(...)
@@ -172,6 +173,6 @@ spl::shared_ptr<frame_factory> video_channel::frame_factory() { return impl_->im
 core::video_format_desc video_channel::video_format_desc() const{return impl_->video_format_desc();}
 void core::video_channel::video_format_desc(const core::video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
 boost::property_tree::wptree video_channel::info() const{return impl_->info();}                
-monitor::source& video_channel::monitor_output(){return impl_->monitor_subject_;}
+monitor::subject& video_channel::monitor_output(){return *impl_->monitor_subject_;}
 
 }}
\ No newline at end of file
index e93bc1d073135780d41ad03755e86ee19f3ac39e..a0bd94eb17995871349ced82393b8a3d8c29cdf1 100644 (file)
@@ -54,7 +54,7 @@ public:
 
        // Methods
                        
-       monitor::source& monitor_output();
+       monitor::subject& monitor_output();
 
        // Properties
 
index e1e6d17da0a2b31781ee240da3f555950d77f3a7..3ad32d01c17780f61bec4df08acbbea787ee07af 100644 (file)
@@ -298,7 +298,7 @@ public:
 
 struct bluefish_consumer_proxy : public core::frame_consumer
 {
-       monitor::subject                                        monitor_subject_;
+       core::monitor::subject                          monitor_subject_;
 
        std::unique_ptr<bluefish_consumer>      consumer_;
        const int                                                       device_index_;
@@ -365,7 +365,7 @@ public:
                return 400 + device_index_;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index b516366ca41617d524c5ab198d25cd79ec6f0543..b2b9620b1d129336e8150e1af30d0f11d626a673 100644 (file)
@@ -516,7 +516,7 @@ public:
 
 struct decklink_consumer_proxy : public core::frame_consumer
 {
-       monitor::subject                                        monitor_subject_;
+       core::monitor::subject                          monitor_subject_;
        const configuration                                     config_;
        std::unique_ptr<decklink_consumer>      consumer_;
        executor                                                        executor_;
@@ -590,7 +590,7 @@ public:
                return 300 + config_.device_index;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index 21afe32c1a73fd339c178ed86c8a09fcf4883b6a..4915bc83e796e2e702e6bc3fcb29bd209906d2d2 100644 (file)
@@ -81,7 +81,7 @@ namespace caspar { namespace decklink {
                
 class decklink_producer : boost::noncopyable, public IDeckLinkInputCallback
 {      
-       monitor::subject                                                                monitor_subject_;
+       core::monitor::subject                                                  monitor_subject_;
        spl::shared_ptr<diagnostics::graph>                             graph_;
        boost::timer                                                                    tick_timer_;
 
@@ -213,15 +213,16 @@ public:
                        video_frame->interlaced_frame   = in_format_desc_.field_mode != core::field_mode::progressive;
                        video_frame->top_field_first    = in_format_desc_.field_mode == core::field_mode::upper ? 1 : 0;
                                
-                       monitor_subject_        << monitor::message("/file/name")                               % model_name_
-                                                               << monitor::message("/file/path")                               % device_index_
-                                                               << monitor::message("/file/video/width")                        % video->GetWidth()
-                                                               << monitor::message("/file/video/height")               % video->GetHeight()
-                                                               << monitor::message("/file/video/field")                        % u8(!video_frame->interlaced_frame ? "progressive" : (video_frame->top_field_first ? "upper" : "lower"))
-                                                               << monitor::message("/file/audio/sample-rate")  % 48000
-                                                               << monitor::message("/file/audio/channels")             % 2
-                                                               << monitor::message("/file/audio/format")               % u8(av_get_sample_fmt_name(AV_SAMPLE_FMT_S32))
-                                                               << monitor::message("/file/fps")                                        % in_format_desc_.fps;
+                       monitor_subject_
+                                       << core::monitor::message("/file/name")                                 % model_name_
+                                       << core::monitor::message("/file/path")                                 % device_index_
+                                       << core::monitor::message("/file/video/width")                  % video->GetWidth()
+                                       << core::monitor::message("/file/video/height")                 % video->GetHeight()
+                                       << core::monitor::message("/file/video/field")                  % u8(!video_frame->interlaced_frame ? "progressive" : (video_frame->top_field_first ? "upper" : "lower"))
+                                       << core::monitor::message("/file/audio/sample-rate")    % 48000
+                                       << core::monitor::message("/file/audio/channels")               % 2
+                                       << core::monitor::message("/file/audio/format")                 % u8(av_get_sample_fmt_name(AV_SAMPLE_FMT_S32))
+                                       << core::monitor::message("/file/fps")                                  % in_format_desc_.fps;
 
                        // Audio
 
@@ -271,10 +272,10 @@ public:
                        }
                        
                        graph_->set_value("frame-time", frame_timer.elapsed()*out_format_desc_.fps*0.5);        
-                       monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % out_format_desc_.fps;
+                       monitor_subject_ << core::monitor::message("/profiler/time") % frame_timer.elapsed() % out_format_desc_.fps;
 
                        graph_->set_value("output-buffer", static_cast<float>(frame_buffer_.size())/static_cast<float>(frame_buffer_.capacity()));      
-                       monitor_subject_ << monitor::message("/buffer") % frame_buffer_.size() % frame_buffer_.capacity();
+                       monitor_subject_ << core::monitor::message("/buffer") % frame_buffer_.size() % frame_buffer_.capacity();
                }
                catch(...)
                {
@@ -302,7 +303,7 @@ public:
                return model_name_ + L" [" + boost::lexical_cast<std::wstring>(device_index_) + L"|" + in_format_desc_.name + L"]";
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
@@ -338,7 +339,7 @@ public:
                });
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return producer_->monitor_output();
        }
index 3529907d8032b88385cada42bb64766fb6ae4ed6..23d16bd27c7278029965efdcc7ff7ef5b163f721 100644 (file)
@@ -262,7 +262,7 @@ struct ffmpeg_consumer : boost::noncopyable
        const std::shared_ptr<AVFormatContext>          oc_;
        const core::video_format_desc                           format_desc_;   
 
-       monitor::subject                                                        monitor_subject_;
+       core::monitor::subject                                          monitor_subject_;
        
        tbb::spin_mutex                                                         exception_mutex_;
        std::exception_ptr                                                      exception_;
@@ -388,7 +388,7 @@ public:
                return L"ffmpeg[" + u16(filename_) + L"]";
        }
        
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
@@ -543,8 +543,9 @@ private:
                av_frame->top_field_first       = format_desc_.field_mode == core::field_mode::upper;
                av_frame->pts                           = frame_number_++;
 
-               monitor_subject_ << monitor::message("/frame")  % static_cast<int64_t>(frame_number_)
-                                                                                                       % static_cast<int64_t>(std::numeric_limits<int64_t>::max());
+               monitor_subject_ << core::monitor::message("/frame")
+                               % static_cast<int64_t>(frame_number_)
+                               % static_cast<int64_t>(std::numeric_limits<int64_t>::max());
 
                AVPacket pkt;
                av_init_packet(&pkt);
@@ -843,7 +844,7 @@ public:
                return 200;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return consumer_->monitor_output();
        }
index 9d279694eac8f82ca3ec1405730b27e65a2bb651..fd3cdc2f510509ee0e4bf4f0215328eae34101d9 100644 (file)
@@ -59,7 +59,7 @@ uint64_t get_channel_layout(AVCodecContext* dec)
 
 struct audio_decoder::impl : boost::noncopyable
 {      
-       monitor::subject                                                                                        monitor_subject_;
+       core::monitor::subject                                                                          monitor_subject_;
        input*                                                                                                          input_;
        int                                                                                                                     index_;
        const spl::shared_ptr<AVCodecContext>                                           codec_context_;         
@@ -149,10 +149,10 @@ public:
                frame->nb_samples       = channel_samples;
                frame->format           = AV_SAMPLE_FMT_S32;
                                                        
-               monitor_subject_  << monitor::message("/file/audio/sample-rate")        % codec_context_->sample_rate
-                                               << monitor::message("/file/audio/channels")     % codec_context_->channels
-                                               << monitor::message("/file/audio/format")               % u8(av_get_sample_fmt_name(codec_context_->sample_fmt))
-                                               << monitor::message("/file/audio/codec")                % u8(codec_context_->codec->long_name);                 
+               monitor_subject_  << core::monitor::message("/file/audio/sample-rate")  % codec_context_->sample_rate
+                                               << core::monitor::message("/file/audio/channels")       % codec_context_->channels
+                                               << core::monitor::message("/file/audio/format")         % u8(av_get_sample_fmt_name(codec_context_->sample_fmt))
+                                               << core::monitor::message("/file/audio/codec")          % u8(codec_context_->codec->long_name);                 
 
                return frame;
        }
@@ -174,5 +174,5 @@ audio_decoder& audio_decoder::operator=(audio_decoder&& other){impl_ = std::move
 std::shared_ptr<AVFrame> audio_decoder::operator()(){return impl_->poll();}
 uint32_t audio_decoder::nb_frames() const{return impl_->nb_frames();}
 std::wstring audio_decoder::print() const{return impl_->print();}
-monitor::source& audio_decoder::monitor_output() { return impl_->monitor_subject_;}
+core::monitor::subject& audio_decoder::monitor_output() { return impl_->monitor_subject_;}
 }}
\ No newline at end of file
index 177e104859b64cfe9172f6878f29756b10a3bae8..7e07e1ec4cb44dd91a048c133e827c13b6af2e4d 100644 (file)
@@ -55,7 +55,7 @@ public:
        
        std::wstring print() const;
        
-       monitor::source& monitor_output();
+       core::monitor::subject& monitor_output();
 
 private:
        struct impl;
index 0dffaaea4c1127dc96207af5ee3de9fbe33250a4..2a623f82d188e9dda5ec2f8801b3b2dff42f7f89 100644 (file)
 #include <queue>
 
 namespace caspar { namespace ffmpeg {
-                               
+
+std::wstring get_relative_or_original(
+               const std::wstring& filename,
+               const boost::filesystem::wpath& relative_to)
+{
+       boost::filesystem::wpath file(filename);
+       auto result = file.filename().wstring();
+
+       boost::filesystem::wpath current_path = file;
+
+       while (true)
+       {
+               current_path = current_path.parent_path();
+
+               if (boost::filesystem::equivalent(current_path, relative_to))
+                       break;
+
+               if (current_path.empty())
+                       return filename;
+
+               result = current_path.filename().wstring() + L"/" + result;
+       }
+
+       return result;
+}
+
 struct ffmpeg_producer : public core::frame_producer_base
 {
-       monitor::subject                                                                monitor_subject_;
+       spl::shared_ptr<core::monitor::subject>                 monitor_subject_;
        const std::wstring                                                              filename_;
+       const std::wstring                                                              path_relative_to_media_;
        
        const spl::shared_ptr<diagnostics::graph>               graph_;
                                        
@@ -97,6 +123,7 @@ public:
                                                         uint32_t start, 
                                                         uint32_t length) 
                : filename_(filename)
+               , path_relative_to_media_(get_relative_or_original(filename, env::media_folder()))
                , frame_factory_(frame_factory)         
                , format_desc_(format_desc)
                , input_(graph_, filename_, loop, start, length)
@@ -113,7 +140,7 @@ public:
                try
                {
                        video_decoder_.reset(new video_decoder(input_));
-                       video_decoder_->monitor_output().link_target(&monitor_subject_);
+                       video_decoder_->monitor_output().attach_parent(monitor_subject_);
                        constraints_.width.set(video_decoder_->width());
                        constraints_.height.set(video_decoder_->height());
                        
@@ -132,7 +159,7 @@ public:
                try
                {
                        audio_decoder_ .reset(new audio_decoder(input_, format_desc_));
-                       audio_decoder_->monitor_output().link_target(&monitor_subject_);
+                       audio_decoder_->monitor_output().attach_parent(monitor_subject_);
                        
                        CASPAR_LOG(info) << print() << L" " << audio_decoder_->print();
                }
@@ -172,15 +199,16 @@ public:
                        graph_->set_tag("underflow");
                                                                        
                graph_->set_value("frame-time", frame_timer.elapsed()*format_desc_.fps*0.5);
-               monitor_subject_        << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps);                 
-                                                               
-               monitor_subject_        << monitor::message("/file/time")                       % (file_frame_number()/fps_) 
-                                                                                                                               % (file_nb_frames()/fps_)
-                                                       << monitor::message("/file/frame")                      % static_cast<int32_t>(file_frame_number())
-                                                                                                                               % static_cast<int32_t>(file_nb_frames())
-                                                       << monitor::message("/file/fps")                        % fps_
-                                                       << monitor::message("/file/path")                       % filename_
-                                                       << monitor::message("/loop")                            % input_.loop();
+               *monitor_subject_
+                               << core::monitor::message("/profiler/time")     % frame_timer.elapsed() % (1.0/format_desc_.fps);                       
+               *monitor_subject_
+                               << core::monitor::message("/file/time")         % (file_frame_number()/fps_) 
+                                                                                                                       % (file_nb_frames()/fps_)
+                               << core::monitor::message("/file/frame")        % static_cast<int32_t>(file_frame_number())
+                                                                                                                       % static_cast<int32_t>(file_nb_frames())
+                               << core::monitor::message("/file/fps")          % fps_
+                               << core::monitor::message("/file/path")         % path_relative_to_media_
+                               << core::monitor::message("/loop")                      % input_.loop();
                                                
                return frame;
        }
@@ -296,9 +324,9 @@ public:
                return info;
        }
        
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
-               return monitor_subject_;
+               return *monitor_subject_;
        }
 
        // ffmpeg_producer
index 5d94f21690f80b39ecf30c208ec4ba0694bc8ab0..89fb0c24c18570cce24a8207d0dc8dfe3d5f5b99 100644 (file)
@@ -54,7 +54,7 @@ namespace caspar { namespace ffmpeg {
        
 struct video_decoder::impl : boost::noncopyable
 {
-       monitor::subject                                                monitor_subject_;
+       core::monitor::subject                                  monitor_subject_;
        input*                                                                  input_;
        int                                                                             index_;
        const spl::shared_ptr<AVCodecContext>   codec_context_;
@@ -144,10 +144,10 @@ public:
                if(frame->repeat_pict > 0)
                        CASPAR_LOG(warning) << "[video_decoder] repeat_pict not implemented.";
                                
-               monitor_subject_  << monitor::message("/file/video/width")      % width_
-                                               << monitor::message("/file/video/height")       % height_
-                                               << monitor::message("/file/video/field")        % u8(!frame->interlaced_frame ? "progressive" : (frame->top_field_first ? "upper" : "lower"))
-                                               << monitor::message("/file/video/codec")        % u8(codec_context_->codec->long_name);
+               monitor_subject_  << core::monitor::message("/file/video/width")        % width_
+                                               << core::monitor::message("/file/video/height") % height_
+                                               << core::monitor::message("/file/video/field")  % u8(!frame->interlaced_frame ? "progressive" : (frame->top_field_first ? "upper" : "lower"))
+                                               << core::monitor::message("/file/video/codec")  % u8(codec_context_->codec->long_name);
                
                return frame;
        }
@@ -173,5 +173,5 @@ uint32_t video_decoder::nb_frames() const{return impl_->nb_frames();}
 uint32_t video_decoder::file_frame_number() const{return impl_->file_frame_number_;}
 bool video_decoder::is_progressive() const{return impl_->is_progressive_;}
 std::wstring video_decoder::print() const{return impl_->print();}
-monitor::source& video_decoder::monitor_output() { return impl_->monitor_subject_; }
+core::monitor::subject& video_decoder::monitor_output() { return impl_->monitor_subject_; }
 }}
\ No newline at end of file
index f01c88140c34ae564378dd652fe13cfbfd4b2243..f911aa0246e5939beeea7b411bab7c1ec2997c6c 100644 (file)
@@ -53,7 +53,7 @@ public:
 
        std::wstring print() const;
                
-       monitor::source& monitor_output();
+       core::monitor::subject& monitor_output();
 
 private:
        struct impl;
index 7802ceee37ebede4dbb50ed9f4a52764d891e6c6..2acb9da6be6e43bfe1d43fe407d675532c274624 100644 (file)
@@ -154,7 +154,7 @@ public:
                return L"";
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return flash_producer_->monitor_output();
        }
@@ -219,6 +219,6 @@ void cg_proxy::update(int layer, const std::wstring& data){impl_->update(layer,
 std::wstring cg_proxy::invoke(int layer, const std::wstring& label){return impl_->timed_invoke(layer, label);}
 std::wstring cg_proxy::description(int layer){return impl_->timed_description(layer);}
 std::wstring cg_proxy::template_host_info(){return impl_->timed_template_host_info();}
-monitor::source& cg_proxy::monitor_output(){return impl_->monitor_output();}
+core::monitor::subject& cg_proxy::monitor_output(){return impl_->monitor_output();}
 
 }}
\ No newline at end of file
index a5bd3a18b1d48fad84482217003148956862c93d..0161d5665b4e6bcefdeadf3839dbbbb0025d558e 100644 (file)
@@ -51,7 +51,7 @@ public:
        std::wstring invoke(int layer, const std::wstring& label);
        std::wstring description(int layer);
        std::wstring template_host_info();
-       monitor::source& monitor_output();
+       core::monitor::subject& monitor_output();
 
 private:
        struct impl;
index d29533ef9e105e04cc3801b8199b9d5074b7d92d..9cc18f47fe4e6f77fc9c1d2ad866919e311e7cb5 100644 (file)
@@ -167,7 +167,7 @@ class flash_renderer
                }
        } com_init_;
 
-       monitor::subject&                                                       monitor_subject_;
+       core::monitor::subject&                                         monitor_subject_;
 
        const std::wstring                                                      filename_;
 
@@ -185,7 +185,7 @@ class flash_renderer
        const int                                                                       height_;
        
 public:
-       flash_renderer(monitor::subject& monitor_subject, const spl::shared_ptr<diagnostics::graph>& graph, const std::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height) 
+       flash_renderer(core::monitor::subject& monitor_subject, const spl::shared_ptr<diagnostics::graph>& graph, const std::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height) 
                : monitor_subject_(monitor_subject)
                , graph_(graph)
                , filename_(filename)
@@ -264,7 +264,7 @@ public:
                        graph_->set_tag("sync");
 
                graph_->set_value("sync", sync);
-               monitor_subject_ << monitor::message("/sync") % sync;
+               monitor_subject_ << core::monitor::message("/sync") % sync;
                
                ax_->Tick();
                                        
@@ -299,7 +299,7 @@ public:
                }               
                                                                                
                graph_->set_value("frame-time", static_cast<float>(frame_timer.elapsed()/frame_time)*0.5f);
-               monitor_subject_ << monitor::message("/renderer/profiler/time") % frame_timer.elapsed() % frame_time;
+               monitor_subject_ << core::monitor::message("/renderer/profiler/time") % frame_timer.elapsed() % frame_time;
                return head_;
        }
        
@@ -324,7 +324,7 @@ public:
 
 struct flash_producer : public core::frame_producer_base
 {      
-       monitor::subject                                                                monitor_subject_;
+       core::monitor::subject                                                  monitor_subject_;
        const std::wstring                                                              filename_;      
        const spl::shared_ptr<core::frame_factory>              frame_factory_;
        const core::video_format_desc                                   format_desc_;
@@ -387,11 +387,11 @@ public:
                else            
                        graph_->set_tag("late-frame");          
                                
-               monitor_subject_ << monitor::message("/host/path")      % filename_
-                                               << monitor::message("/host/width")      % width_
-                                               << monitor::message("/host/height") % height_
-                                               << monitor::message("/host/fps")        % fps_
-                                               << monitor::message("/buffer")          % output_buffer_.size() % buffer_size_;
+               monitor_subject_ << core::monitor::message("/host/path")        % filename_
+                                               << core::monitor::message("/host/width")        % width_
+                                               << core::monitor::message("/host/height")       % height_
+                                               << core::monitor::message("/host/fps")          % fps_
+                                               << core::monitor::message("/buffer")            % output_buffer_.size() % buffer_size_;
 
                return last_frame_ = frame;
        }
@@ -446,7 +446,7 @@ public:
                return info;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
@@ -498,7 +498,7 @@ public:
                }
 
                graph_->set_value("tick-time", static_cast<float>(tick_timer_.elapsed()/fps_)*0.5f);
-               monitor_subject_ << monitor::message("/profiler/time") % tick_timer_.elapsed() % fps_;
+               monitor_subject_ << core::monitor::message("/profiler/time") % tick_timer_.elapsed() % fps_;
 
                output_buffer_.push(std::move(frame_buffer_.front()));
                frame_buffer_.pop();
index 24279af8244fa4e8dedffd10322d77d71810f15a..7a351c211544b6bef76753571593f07ebfa79634 100644 (file)
@@ -67,8 +67,8 @@ namespace caspar { namespace image {
 
 struct image_consumer : public core::frame_consumer
 {
-       monitor::subject        monitor_subject_;
-       std::wstring            filename_;
+       core::monitor::subject  monitor_subject_;
+       std::wstring                    filename_;
 public:
 
        // frame_consumer
@@ -139,7 +139,7 @@ public:
                return 100;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index dcecc4bd4f91d742814aa838d7c57cc2172a5444..746a5cd7a7dde977ffab234bd62f71b7b4ce9655 100644 (file)
@@ -74,11 +74,11 @@ std::pair<core::draw_frame, core::constraints> load_image(
 
 struct image_producer : public core::frame_producer_base
 {      
-       monitor::subject                monitor_subject_;
-       const std::wstring              description_;
-       const spl::shared_ptr<core::frame_factory> frame_factory_;
-       core::draw_frame                frame_;
-       core::constraints               constraints_;
+       core::monitor::subject                                          monitor_subject_;
+       const std::wstring                                                      description_;
+       const spl::shared_ptr<core::frame_factory>      frame_factory_;
+       core::draw_frame                                                        frame_;
+       core::constraints                                                       constraints_;
        
        image_producer(const spl::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& description) 
                : description_(description)
@@ -118,7 +118,7 @@ struct image_producer : public core::frame_producer_base
 
        core::draw_frame receive_impl() override
        {
-               monitor_subject_ << monitor::message("/file/path") % description_;
+               monitor_subject_ << core::monitor::message("/file/path") % description_;
 
                return frame_;
        }
@@ -151,7 +151,7 @@ struct image_producer : public core::frame_producer_base
                return info;
        }
 
-       monitor::source& monitor_output() 
+       core::monitor::subject& monitor_output() 
        {
                return monitor_subject_;
        }
index 73121823b6917067ed0aabb891efd6266a2968b9..f92a6a28f6d0d2a38990a8cee356f90a0b4efda5 100644 (file)
@@ -58,7 +58,7 @@ namespace caspar { namespace image {
                
 struct image_scroll_producer : public core::frame_producer_base
 {      
-       monitor::subject                                monitor_subject_;
+       core::monitor::subject                  monitor_subject_;
 
        const std::wstring                              filename_;
        std::vector<core::draw_frame>   frames_;
@@ -350,9 +350,9 @@ struct image_scroll_producer : public core::frame_producer_base
                        result = core::draw_frame::interlace(field1, field2, format_desc_.field_mode);
                }
                
-               monitor_subject_ << monitor::message("/file/path") % filename_
-                                                << monitor::message("/delta") % delta_ 
-                                                << monitor::message("/speed") % speed_;
+               monitor_subject_ << core::monitor::message("/file/path") % filename_
+                                                << core::monitor::message("/delta") % delta_ 
+                                                << core::monitor::message("/speed") % speed_;
 
                return result;
        }
@@ -394,7 +394,7 @@ struct image_scroll_producer : public core::frame_producer_base
                }
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index cdea8773f7a50cdc6ef8da06f1148b17c5800298..d4fc9e8ff5aba8c646c1f31f0f10ada9110e9a38 100644 (file)
@@ -104,18 +104,18 @@ void init_device()
 
 struct oal_consumer : public core::frame_consumer
 {
-       monitor::subject                                                                        monitor_subject_;
+       core::monitor::subject                          monitor_subject_;
 
-       spl::shared_ptr<diagnostics::graph>                                     graph_;
-       boost::timer                                                                            perf_timer_;
-       int                                                                                                     channel_index_;
+       spl::shared_ptr<diagnostics::graph>     graph_;
+       boost::timer                                            perf_timer_;
+       int                                                                     channel_index_;
        
-       core::video_format_desc                                                         format_desc_;
+       core::video_format_desc                         format_desc_;
 
-       ALuint                                                                                          source_;
-       std::array<ALuint, 3>                                                           buffers_;
+       ALuint                                                          source_;
+       std::array<ALuint, 3>                           buffers_;
 
-       executor                                                                                        executor_;
+       executor                                                        executor_;
 
 public:
        oal_consumer() 
@@ -260,7 +260,7 @@ public:
                return 500;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index 9cb8003afa5c087d4b2b06970b5143c9ddf4c142..83d4d87b83dc4775ee54374c53c480c2e921dd2e 100644 (file)
@@ -54,7 +54,7 @@ namespace caspar { namespace reroute {
 class reroute_producer : public reactive::observer<std::map<int, core::draw_frame>>
                                           , public core::frame_producer_base
 {
-       monitor::subject                                                                                                monitor_subject_;
+       core::monitor::subject                                                                                  monitor_subject_;
 
        core::constraints                                                                                               constraints_;
        const spl::shared_ptr<diagnostics::graph>                                               graph_;
@@ -115,7 +115,7 @@ public:
                return info;
        }
                
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
index a8f85d116a0561552d9a9f12a86a9cc65001a2b0..abd542a538ecad1c4d6f898ceec03ddc7608b8b0 100644 (file)
@@ -554,10 +554,10 @@ public:
 
 struct screen_consumer_proxy : public core::frame_consumer
 {
-       monitor::subject monitor_subject_;
-       const configuration config_;
-       std::unique_ptr<screen_consumer> consumer_;
-       core::interaction_sink* sink_;
+       core::monitor::subject                          monitor_subject_;
+       const configuration                                     config_;
+       std::unique_ptr<screen_consumer>        consumer_;
+       core::interaction_sink*                         sink_;
 
 public:
 
@@ -615,7 +615,7 @@ public:
                return 600 + (config_.key_only ? 10 : 0) + config_.screen_index;
        }
 
-       monitor::source& monitor_output()
+       core::monitor::subject& monitor_output()
        {
                return monitor_subject_;
        }
diff --git a/protocol/asio/io_service_manager.cpp b/protocol/asio/io_service_manager.cpp
new file mode 100644 (file)
index 0000000..00ef8f3
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../StdAfx.h"
+
+#include "io_service_manager.h"
+
+#include <memory>
+
+#include <boost/asio/io_service.hpp>
+#include <boost/thread/thread.hpp>
+
+#include <common/except.h>
+
+namespace caspar { namespace protocol { namespace asio {
+
+struct io_service_manager::impl
+{
+       boost::asio::io_service service_;
+       // To keep the io_service::run() running although no pending async
+       // operations are posted.
+       std::unique_ptr<boost::asio::io_service::work> work_;
+       boost::thread thread_;
+
+       impl()
+               : work_(new boost::asio::io_service::work(service_))
+               , thread_([this] { run(); })
+       {
+       }
+
+       void run()
+       {
+               win32_exception::ensure_handler_installed_for_thread("asio-thread");
+
+               service_.run();
+       }
+
+       ~impl()
+       {
+               work_.reset();
+               service_.stop();
+               thread_.join();
+       }
+};
+
+io_service_manager::io_service_manager()
+       : impl_(new impl)
+{
+}
+
+io_service_manager::~io_service_manager()
+{
+}
+
+boost::asio::io_service& io_service_manager::service()
+{
+       return impl_->service_;
+}
+
+}}}
diff --git a/protocol/asio/io_service_manager.h b/protocol/asio/io_service_manager.h
new file mode 100644 (file)
index 0000000..cc6f253
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <memory>
+
+#include <boost/noncopyable.hpp>
+
+namespace boost { namespace asio {
+       class io_service;
+}}
+
+namespace caspar { namespace protocol { namespace asio {
+
+class io_service_manager : boost::noncopyable
+{
+public:
+       io_service_manager();
+       ~io_service_manager();
+       boost::asio::io_service& service();
+private:
+       struct impl;
+       std::unique_ptr<impl> impl_;
+};
+
+}}}
diff --git a/protocol/osc/client.cpp b/protocol/osc/client.cpp
new file mode 100644 (file)
index 0000000..5ea6d9a
--- /dev/null
@@ -0,0 +1,325 @@
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../stdafx.h"
+
+#include "client.h"
+
+#include "oscpack/OscOutboundPacketStream.h"
+#include "oscpack/OscHostEndianness.h"
+
+#include <common/utf.h>
+#include <common/except.h>
+#include <common/endian.h>
+
+#include <core/monitor/monitor.h>
+
+#include <functional>
+#include <vector>
+#include <unordered_map>
+
+#include <boost/asio.hpp>
+#include <boost/foreach.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
+
+#include <tbb/spin_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
+
+using namespace boost::asio::ip;
+
+namespace caspar { namespace protocol { namespace osc {
+
+template<typename T>
+struct no_init_proxy
+{
+    T value;
+
+    no_init_proxy() 
+       {
+               static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
+        static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
+    }
+};
+
+typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+
+template<typename T>
+struct param_visitor : public boost::static_visitor<void>
+{
+       T& o;
+
+       param_visitor(T& o)
+               : o(o)
+       {
+       }               
+               
+       void operator()(const bool value)                                       {o << value;}
+       void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
+       void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
+       void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
+       void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
+       void operator()(const float value)                                      {o << value;}
+       void operator()(const double value)                                     {o << static_cast<float>(value);}
+       void operator()(const std::string& value)                       {o << value.c_str();}
+       void operator()(const std::wstring& value)                      {o << u8(value).c_str();}
+       void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
+};
+
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
+{              
+       destination.resize(4096);
+
+       ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       o << ::osc::BeginMessage(e.path().c_str());
+                               
+       param_visitor<decltype(o)> param_visitor(o);
+       BOOST_FOREACH(const auto& data, e.data())
+               boost::apply_visitor(param_visitor, data);
+                               
+       o << ::osc::EndMessage;
+               
+       destination.resize(o.Size());
+}
+
+byte_vector write_osc_bundle_start()
+{
+       byte_vector destination;
+       destination.resize(16);
+
+       ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       o << ::osc::BeginBundle();
+
+       destination.resize(o.Size());
+
+       return destination;
+}
+
+void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
+{              
+       destination.resize(4);
+
+       int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
+
+#ifdef OSC_HOST_LITTLE_ENDIAN
+       *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
+#else
+       *bundle_element_size = static_cast<int32_t>(bundle.size());
+#endif
+}
+
+struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
+{
+       udp::socket socket_;
+       tbb::spin_mutex                                                                 endpoints_mutex_;
+       std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
+
+       std::unordered_map<std::string, byte_vector>    updates_;
+       boost::mutex                                                                    updates_mutex_;                                                         
+       boost::condition_variable                                               updates_cond_;
+
+       tbb::atomic<bool>                                                               is_running_;
+
+       boost::thread                                                                   thread_;
+       
+public:
+       impl(boost::asio::io_service& service)
+               : socket_(service, udp::v4())
+               , thread_(boost::bind(&impl::run, this))
+       {
+       }
+
+       ~impl()
+       {
+               is_running_ = false;
+
+               updates_cond_.notify_one();
+
+               thread_.join();
+       }
+
+       std::shared_ptr<void> get_subscription_token(
+                       const boost::asio::ip::udp::endpoint& endpoint)
+       {
+               tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+               ++reference_counts_by_endpoint_[endpoint];
+
+               std::weak_ptr<impl> weak_self = shared_from_this();
+
+               return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
+               {
+                       auto strong = weak_self.lock();
+
+                       if (!strong)
+                               return;
+
+                       auto& self = *strong;
+
+                       tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
+
+                       int reference_count_after =
+                               --self.reference_counts_by_endpoint_[endpoint];
+
+                       if (reference_count_after == 0)
+                               self.reference_counts_by_endpoint_.erase(endpoint);
+               });
+       }
+private:
+       void propagate(const core::monitor::message& msg)
+       {
+               boost::lock_guard<boost::mutex> lock(updates_mutex_);
+
+               try 
+               {
+                       write_osc_event(updates_[msg.path()], msg);
+               }
+               catch(...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+                       updates_.erase(msg.path());
+               }
+
+               updates_cond_.notify_one();
+       }
+
+       template<typename T>
+       void do_send(
+                       const T& buffers, const std::vector<udp::endpoint>& destinations)
+       {
+               boost::system::error_code ec;
+
+               BOOST_FOREACH(const auto& endpoint, destinations)
+                       socket_.send_to(buffers, endpoint, 0, ec);
+       }
+
+       void run()
+       {
+               // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
+               const int SAFE_DATAGRAM_SIZE = 508;
+
+               try
+               {
+                       is_running_ = true;
+
+                       std::unordered_map<std::string, byte_vector> updates;
+                       std::vector<udp::endpoint> destinations;
+                       const byte_vector bundle_header = write_osc_bundle_start();
+                       std::vector<byte_vector> element_headers;
+
+                       while (is_running_)
+                       {               
+                               updates.clear();
+                               destinations.clear();
+
+                               {                       
+                                       boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
+
+                                       if (updates_.empty())
+                                               updates_cond_.wait(cond_lock);
+
+                                       std::swap(updates, updates_);
+                               }
+
+                               {
+                                       tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+                                       BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+                                               destinations.push_back(endpoint.first);
+                               }
+
+                               if (destinations.empty())
+                                       continue;
+
+                               std::vector<boost::asio::const_buffers_1> buffers;
+                               element_headers.resize(
+                                               std::max(element_headers.size(), updates.size()));
+
+                               int i = 0;
+                               auto datagram_size = bundle_header.size();
+                               buffers.push_back(boost::asio::buffer(bundle_header));
+
+                               BOOST_FOREACH(const auto& slot, updates)
+                               {
+                                       write_osc_bundle_element_start(element_headers[i], slot.second);
+                                       const auto& headers = element_headers;
+
+                                       auto size_of_element = headers[i].size() + slot.second.size();
+       
+                                       if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
+                                       {
+                                               do_send(buffers, destinations);
+                                               buffers.clear();
+                                               buffers.push_back(boost::asio::buffer(bundle_header));
+                                               datagram_size = bundle_header.size();
+                                       }
+
+                                       buffers.push_back(boost::asio::buffer(headers[i]));
+                                       buffers.push_back(boost::asio::buffer(slot.second));
+
+                                       datagram_size += size_of_element;
+                                       ++i;
+                               }
+                       
+                               if (!buffers.empty())
+                                       do_send(buffers, destinations);
+                       }
+               }
+               catch (...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+               }
+       }
+};
+
+client::client(boost::asio::io_service& service) 
+       : impl_(new impl(service))
+{
+}
+
+client::client(client&& other)
+       : impl_(std::move(other.impl_))
+{
+}
+
+client& client::operator=(client&& other)
+{
+       impl_ = std::move(other.impl_);
+       return *this;
+}
+
+client::~client()
+{
+}
+
+std::shared_ptr<void> client::get_subscription_token(
+                       const boost::asio::ip::udp::endpoint& endpoint)
+{
+       return impl_->get_subscription_token(endpoint);
+}
+
+spl::shared_ptr<core::monitor::sink> client::sink()
+{
+       return impl_;
+}
+
+}}}
diff --git a/protocol/osc/client.h b/protocol/osc/client.h
new file mode 100644 (file)
index 0000000..125fd80
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <boost/asio/ip/udp.hpp>
+#include <boost/noncopyable.hpp>
+
+#include <common/memory.h>
+#include <core/monitor/monitor.h>
+
+namespace caspar { namespace protocol { namespace osc {
+
+class client
+{
+       client(const client&);
+       client& operator=(const client&);
+public:        
+
+       // Static Members
+
+       // Constructors
+
+       client(boost::asio::io_service& service);
+       
+       client(client&&);
+
+       /**
+        * Get a subscription token that ensures that OSC messages are sent to the
+        * given endpoint as long as the token is alive. It will stop sending when
+        * the token is dropped unless another token to the same endpoint has
+        * previously been checked out.
+        *
+        * @param endpoint The UDP endpoint to send OSC messages to.
+        *
+        * @return The token. It is ok for the token to outlive the client
+        */
+       std::shared_ptr<void> get_subscription_token(
+                       const boost::asio::ip::udp::endpoint& endpoint);
+
+       ~client();
+
+       // Methods
+               
+       client& operator=(client&&);
+       
+       // Properties
+
+       spl::shared_ptr<core::monitor::sink> sink();
+private:
+       struct impl;
+       spl::shared_ptr<impl> impl_;
+};
+
+}}}
diff --git a/protocol/osc/server.cpp b/protocol/osc/server.cpp
deleted file mode 100644 (file)
index 5861428..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-#include "..\stdafx.h"
-
-#include "server.h"
-
-#include "oscpack/oscOutboundPacketStream.h"
-
-#include <functional>
-#include <vector>
-
-#include <boost/asio.hpp>
-#include <boost/foreach.hpp>
-#include <boost/thread.hpp>
-
-using namespace boost::asio::ip;
-
-namespace caspar { namespace protocol { namespace osc {
-
-template<typename T>
-struct param_visitor : public boost::static_visitor<void>
-{
-       T& o;
-
-       param_visitor(T& o)
-               : o(o)
-       {
-       }               
-
-       void operator()(const bool value)                                       {o << value;}
-       void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
-       void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
-       void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
-       void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
-       void operator()(const float value)                                      {o << value;}
-       void operator()(const double value)                                     {o << static_cast<float>(value);}
-       void operator()(const std::string& value)                       {o << value.c_str();}
-       void operator()(const std::wstring& value)                      {o << std::string(value.begin(), value.end()).c_str();}
-       void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
-};
-
-std::vector<char> write_osc_event(const monitor::message& e)
-{
-       std::array<char, 4096> buffer;
-       ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
-
-       o       << ::osc::BeginMessage(e.path().c_str());
-
-       param_visitor<decltype(o)> pd_visitor(o);
-       BOOST_FOREACH(auto data, e.data())
-               boost::apply_visitor(pd_visitor, data);
-
-       o       << ::osc::EndMessage;
-
-       return std::vector<char>(o.Data(), o.Data() + o.Size());
-}
-
-struct server::impl
-{
-       boost::asio::io_service                                         service_;
-
-       udp::endpoint                                                           endpoint_;
-       udp::socket                                                                     socket_;        
-
-       boost::thread                                                           thread_;
-
-       Concurrency::call<monitor::message>     on_next_;
-
-public:
-       impl(udp::endpoint endpoint, 
-                Concurrency::ISource<monitor::message>& source)
-               : endpoint_(endpoint)
-               , socket_(service_, endpoint_.protocol())
-               , thread_(std::bind(&boost::asio::io_service::run, &service_))
-               , on_next_([this](const monitor::message& msg){ on_next(msg); })
-       {
-               source.link_target(&on_next_);
-       }
-
-       ~impl()
-       {               
-               thread_.join();
-       }
-
-       void on_next(const monitor::message& msg)
-       {
-               auto data_ptr = spl::make_shared<std::vector<char>>(write_osc_event(msg));
-               if(data_ptr->size() >0)
-                       socket_.async_send_to(boost::asio::buffer(*data_ptr), 
-                                                         endpoint_,
-                                                         boost::bind(&impl::handle_send_to, this, data_ptr,    //data_ptr need to stay alive
-                                                         boost::asio::placeholders::error,
-                                                         boost::asio::placeholders::bytes_transferred));               
-       }       
-
-       void handle_send_to(spl::shared_ptr<std::vector<char>> data, const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
-       {
-       }
-};
-
-server::server(udp::endpoint endpoint, 
-                          Concurrency::ISource<monitor::message>& source) 
-       : impl_(new impl(endpoint, source))
-{
-}
-
-server::server(server&& other)
-       : impl_(std::move(other.impl_))
-{
-}
-
-server& server::operator=(server&& other)
-{
-       impl_ = std::move(other.impl_);
-       return *this;
-}
-
-server::~server()
-{
-}
-
-}}}
\ No newline at end of file
diff --git a/protocol/osc/server.h b/protocol/osc/server.h
deleted file mode 100644 (file)
index 8a648b7..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include <common/memory.h>
-
-#include <core/monitor/monitor.h>
-#include <boost/asio/ip/udp.hpp>
-
-namespace caspar { namespace protocol { namespace osc {
-
-class server
-{
-       server(const server&);
-       server& operator=(const server&);
-public:        
-
-       // Static Members
-
-       // Constructors
-
-       server(boost::asio::ip::udp::endpoint endpoint, 
-                  Concurrency::ISource<monitor::message>& source);
-
-       server(server&&);
-
-       ~server();
-
-       // Methods
-
-       server& operator=(server&&);
-
-       // Properties
-
-private:
-       struct impl;
-       std::unique_ptr<impl> impl_;
-};
-
-}}}
\ No newline at end of file
index 18486013714f3947949b674b2441b63d1f7ed89e..7379ae597eb0928e7517f2af063d8989b18bed12 100644 (file)
     <ClInclude Include="amcp\AMCPCommandsImpl.h" />\r
     <ClInclude Include="amcp\AMCPProtocolStrategy.h" />\r
     <ClInclude Include="amcp\amcp_shared.h" />\r
+    <ClInclude Include="asio\io_service_manager.h" />\r
     <ClInclude Include="cii\CIICommand.h" />\r
     <ClInclude Include="cii\CIICommandsImpl.h" />\r
     <ClInclude Include="cii\CIIProtocolStrategy.h" />\r
     <ClInclude Include="clk\CLKProtocolStrategy.h" />\r
     <ClInclude Include="clk\clk_commands.h" />\r
     <ClInclude Include="clk\clk_command_processor.h" />\r
+    <ClInclude Include="osc\client.h" />\r
     <ClInclude Include="osc\oscpack\MessageMappingOscPacketListener.h" />\r
     <ClInclude Include="osc\oscpack\OscException.h" />\r
     <ClInclude Include="osc\oscpack\OscHostEndianness.h" />\r
@@ -47,7 +49,6 @@
     <ClInclude Include="osc\oscpack\OscPrintReceivedElements.h" />\r
     <ClInclude Include="osc\oscpack\OscReceivedElements.h" />\r
     <ClInclude Include="osc\oscpack\OscTypes.h" />\r
-    <ClInclude Include="osc\server.h" />\r
     <ClInclude Include="StdAfx.h" />\r
     <ClInclude Include="util\AsyncEventServer.h" />\r
     <ClInclude Include="util\ClientInfo.h" />\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
     </ClCompile>\r
+    <ClCompile Include="asio\io_service_manager.cpp">\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+    </ClCompile>\r
     <ClCompile Include="cii\CIICommandsImpl.cpp">\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
     </ClCompile>\r
+    <ClCompile Include="osc\client.cpp">\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+    </ClCompile>\r
     <ClCompile Include="osc\oscpack\OscOutboundPacketStream.cpp">\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader>\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader>\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader>\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader>\r
     </ClCompile>\r
-    <ClCompile Include="osc\server.cpp">\r
-      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
-      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
-    </ClCompile>\r
     <ClCompile Include="StdAfx.cpp">\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">Create</PrecompiledHeader>\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">Create</PrecompiledHeader>\r
index 5ce3daa2273ab1ee0098b0f4200ca5b9e51e798a..c94fd46e59bdbaec9f03b9e47612b31d41b9d77d 100644 (file)
@@ -22,6 +22,9 @@
     <Filter Include="source\osc\oscpack">\r
       <UniqueIdentifier>{d6dede65-7a93-4494-aa2d-d18d5267902c}</UniqueIdentifier>\r
     </Filter>\r
+    <Filter Include="source\asio">\r
+      <UniqueIdentifier>{982ff369-f4a0-418b-b613-81ec322208c5}</UniqueIdentifier>\r
+    </Filter>\r
   </ItemGroup>\r
   <ItemGroup>\r
     <ClInclude Include="amcp\AMCPCommand.h">\r
@@ -82,9 +85,6 @@
     <ClInclude Include="osc\oscpack\OscException.h">\r
       <Filter>source\osc\oscpack</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="osc\server.h">\r
-      <Filter>source\osc</Filter>\r
-    </ClInclude>\r
     <ClInclude Include="clk\clk_command_processor.h">\r
       <Filter>source\clk</Filter>\r
     </ClInclude>\r
     <ClInclude Include="amcp\amcp_shared.h">\r
       <Filter>source\amcp</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="osc\client.h">\r
+      <Filter>source\osc</Filter>\r
+    </ClInclude>\r
+    <ClInclude Include="asio\io_service_manager.h">\r
+      <Filter>source\asio</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
   <ItemGroup>\r
     <ClCompile Include="amcp\AMCPCommandQueue.cpp">\r
     <ClCompile Include="osc\oscpack\OscTypes.cpp">\r
       <Filter>source\osc\oscpack</Filter>\r
     </ClCompile>\r
-    <ClCompile Include="osc\server.cpp">\r
-      <Filter>source\osc</Filter>\r
-    </ClCompile>\r
     <ClCompile Include="clk\clk_command_processor.cpp">\r
       <Filter>source\clk</Filter>\r
     </ClCompile>\r
     <ClCompile Include="util\lock_container.cpp">\r
       <Filter>source\util</Filter>\r
     </ClCompile>\r
+    <ClCompile Include="osc\client.cpp">\r
+      <Filter>source\osc</Filter>\r
+    </ClCompile>\r
+    <ClCompile Include="asio\io_service_manager.cpp">\r
+      <Filter>source\asio</Filter>\r
+    </ClCompile>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
index 5f5207fe06d692fee9ddef513ccbc37bdb857d82..43308b7ca7e36bb4f09d6eb7f2952dd213fe7b36 100644 (file)
       <port>5250</port>\r
       <protocol>AMCP</protocol>\r
     </tcp>\r
-    <udp>\r
-      <address>127.0.0.1</address>\r
-      <port>6250</port>\r
-      <protocol>OSC</protocol>\r
-    </udp>\r
   </controllers>\r
 </configuration>\r
 \r
         </consumers>\r
     </channel>\r
 </channels>\r
+<osc>\r
+  <default-port>6250</default-port>\r
+  <predefined-clients>\r
+    <predefined-client>\r
+      <address>127.0.0.1</address>\r
+      <port>5253</port>\r
+    </predefined-client>\r
+  </predefined-clients>\r
+</osc>\r
 -->\r
index f40e53f1103294d0e99b012b30698bf6093e3152..40f74c0466ac1636a4eb5ff3b0a6101fd31e6b43 100644 (file)
 #include <modules/screen/consumer/screen_consumer.h>
 #include <modules/ffmpeg/consumer/ffmpeg_consumer.h>
 
+#include <protocol/asio/io_service_manager.h>
 #include <protocol/amcp/AMCPProtocolStrategy.h>
 #include <protocol/cii/CIIProtocolStrategy.h>
 #include <protocol/CLK/CLKProtocolStrategy.h>
 #include <protocol/util/AsyncEventServer.h>
 #include <protocol/util/strategy_adapters.h>
-#include <protocol/osc/server.h>
+#include <protocol/osc/client.h>
 
 #include <boost/algorithm/string.hpp>
 #include <boost/thread.hpp>
@@ -77,16 +78,21 @@ using namespace protocol;
 
 struct server::impl : boost::noncopyable
 {
-       monitor::subject                                                                        monitor_subject_;
+       protocol::asio::io_service_manager                                      io_service_manager_;
+       spl::shared_ptr<monitor::subject>                                       monitor_subject_;
        accelerator::accelerator                                                        accelerator_;
        std::vector<spl::shared_ptr<IO::AsyncEventServer>>      async_servers_;
-       std::vector<osc::server>                                                        osc_servers_;
+       std::shared_ptr<IO::AsyncEventServer>                           primary_amcp_server_;
+       osc::client                                                                                     osc_client_;
+       std::vector<std::shared_ptr<void>>                                      predefined_osc_subscriptions_;
        std::vector<spl::shared_ptr<video_channel>>                     channels_;
        std::shared_ptr<thumbnail_generator>                            thumbnail_generator_;
        boost::promise<bool>&                                                           shutdown_server_now_;
 
        explicit impl(boost::promise<bool>& shutdown_server_now)                
-               : accelerator_(env::properties().get(L"configuration.accelerator", L"auto")), shutdown_server_now_(shutdown_server_now)
+               : accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
+               , osc_client_(io_service_manager_.service())
+               , shutdown_server_now_(shutdown_server_now)
        {       
 
                ffmpeg::init();
@@ -126,10 +132,15 @@ struct server::impl : boost::noncopyable
 
                setup_controllers(env::properties());
                CASPAR_LOG(info) << L"Initialized controllers.";
+
+               setup_osc(env::properties());
+               CASPAR_LOG(info) << L"Initialized osc.";
        }
 
        ~impl()
-       {               
+       {
+               thumbnail_generator_.reset();
+               primary_amcp_server_.reset();
                async_servers_.clear();
                channels_.clear();
 
@@ -175,7 +186,7 @@ struct server::impl : boost::noncopyable
                                }
                        }               
 
-                   channel->monitor_output().link_target(&monitor_subject_);
+                   channel->monitor_output().attach_parent(monitor_subject_);
                        channels_.push_back(channel);
                }
 
@@ -184,6 +195,50 @@ struct server::impl : boost::noncopyable
                        channels_.push_back(spl::make_shared<video_channel>(static_cast<int>(channels_.size()+1), core::video_format_desc(core::video_format::x576p2500), accelerator_.create_image_mixer()));
        }
 
+       void setup_osc(const boost::property_tree::wptree& pt)
+       {               
+               using boost::property_tree::wptree;
+               using namespace boost::asio::ip;
+
+               monitor_subject_->attach_parent(osc_client_.sink());
+               
+               auto default_port =
+                               pt.get<unsigned short>(L"configuration.osc.default-port", 6250);
+               auto predefined_clients =
+                               pt.get_child_optional(L"configuration.osc.predefined-clients");
+
+               if (predefined_clients)
+               {
+                       BOOST_FOREACH(auto& predefined_client, *predefined_clients)
+                       {
+                               const auto address =
+                                               predefined_client.second.get<std::wstring>(L"address");
+                               const auto port =
+                                               predefined_client.second.get<unsigned short>(L"port");
+                               predefined_osc_subscriptions_.push_back(
+                                               osc_client_.get_subscription_token(udp::endpoint(
+                                                               address_v4::from_string(u8(address)),
+                                                               port)));
+                       }
+               }
+
+               if (primary_amcp_server_)
+                       primary_amcp_server_->add_client_lifecycle_object_factory(
+                                       [=] (const std::string& ipv4_address)
+                                                       -> std::pair<std::wstring, std::shared_ptr<void>>
+                                       {
+                                               using namespace boost::asio::ip;
+
+                                               return std::make_pair(
+                                                               std::wstring(L"osc_subscribe"),
+                                                               osc_client_.get_subscription_token(
+                                                                               udp::endpoint(
+                                                                                               address_v4::from_string(
+                                                                                                               ipv4_address),
+                                                                                               default_port)));
+                                       });
+       }
+
        void setup_thumbnail_generation(const boost::property_tree::wptree& pt)
        {
                if (!pt.get(L"configuration.thumbnails.generate-thumbnails", true))
@@ -222,18 +277,8 @@ struct server::impl : boost::noncopyable
                                        auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(create_protocol(protocol), port);
                                        async_servers_.push_back(asyncbootstrapper);
 
-                                       //TODO: remove - test
-                                       asyncbootstrapper->add_client_lifecycle_object_factory([=] (const std::string& ipv4_address) {
-                                                                                                                                                                       return std::pair<std::wstring, std::shared_ptr<void>>(L"log", std::shared_ptr<void>(nullptr, [] (void*) 
-                                                                                                                                                                       { CASPAR_LOG(info) << "Client disconnect (lifecycle)"; }));
-                                                                                                                                                               });
-                               }
-                               else if(name == L"udp")
-                               {
-                                       const auto address = xml_controller.second.get(L"address", L"127.0.0.1");
-                                       const auto port = xml_controller.second.get<unsigned short>(L"port", 6250);
-
-                                       osc_servers_.push_back(osc::server(boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::from_string(std::string(address.begin(), address.end())), port), monitor_subject_));
+                                       if (!primary_amcp_server_ && boost::iequals(protocol, L"AMCP"))
+                                               primary_amcp_server_ = asyncbootstrapper;
                                }
                                else
                                        CASPAR_LOG(warning) << "Invalid controller: " << name;  
@@ -270,6 +315,6 @@ const std::vector<spl::shared_ptr<video_channel>> server::channels() const
        return impl_->channels_;
 }
 std::shared_ptr<core::thumbnail_generator> server::get_thumbnail_generator() const {return impl_->thumbnail_generator_; }
-monitor::source& server::monitor_output() { return impl_->monitor_subject_; }
+core::monitor::subject& server::monitor_output() { return *impl_->monitor_subject_; }
 
 }
\ No newline at end of file
index 00ea5eb72c5be6fb8c4f6be20eb6b7d995ce4fdc..a658abc306246e9c3a686dd57cb4bdcb200e5c22 100644 (file)
@@ -44,7 +44,7 @@ public:
        const std::vector<spl::shared_ptr<core::video_channel>> channels() const;
        std::shared_ptr<core::thumbnail_generator> get_thumbnail_generator() const;
 
-       monitor::source& monitor_output();
+       core::monitor::subject& monitor_output();
 private:
        struct impl;
        spl::shared_ptr<impl> impl_;