]> git.sesse.net Git - casparcg/commitdiff
Flash producer
authorHelge Norberg <helge.norberg@gmail.com>
Tue, 8 Oct 2013 09:37:26 +0000 (11:37 +0200)
committerHelge Norberg <helge.norberg@gmail.com>
Tue, 8 Oct 2013 09:37:26 +0000 (11:37 +0200)
    - globally serialize initialization and destruction of flash players, to avoid race conditions in flash.
    - changed so that the flash buffer is filled with flash player generated content at initialization instead of empty frames.

OSC
    - merged OSC improvements #152 made by Robert Nagy up until commit https://github.com/ronag/Server/commit/bae86d21f1569b4b7ac1e08c024d62f389efa57e
    - enclosed multiple OSC messages in the same UDP packet inside an OSC bundle to comply with OSC spec
    - ensured that UDP packets are not too big (causing fragmentation, and errors on OSC receiving side)
    - removed use of Microsoft Agents library which seemed to be the cause of running out of memory after a while. Now everything is done synchronously instead (without support for multiple targets, just a single target (parent) for now).

29 files changed:
common/diagnostics/graph.cpp
core/monitor/monitor.cpp
core/monitor/monitor.h
core/producer/channel/channel_producer.cpp
core/producer/color/color_producer.cpp
core/producer/frame_producer.cpp
core/producer/frame_producer.h
core/producer/layer.cpp
core/producer/layer.h
core/producer/layer/layer_producer.cpp
core/producer/separated/separated_producer.cpp
core/producer/stage.cpp
core/producer/stage.h
core/producer/transition/transition_producer.cpp
core/video_channel.cpp
core/video_channel.h
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/flash/producer/FlashAxContainer.cpp
modules/flash/producer/cg_producer.cpp
modules/flash/producer/cg_producer.h
modules/flash/producer/flash_producer.cpp
modules/image/producer/image_producer.cpp
modules/image/producer/image_scroll_producer.cpp
protocol/amcp/AMCPCommandsImpl.cpp
protocol/osc/client.cpp
protocol/osc/client.h
shell/server.cpp
shell/server.h

index 3c7da27c3fc7512b5291c837b75b4bd7b52b0ee5..fc08de8e775e415a8bb2cefbaeceb04fd8716efb 100644 (file)
@@ -282,6 +282,7 @@ struct graph::impl : public drawable
        bool auto_reset_;\r
 \r
        impl()\r
+               : auto_reset_(false)\r
        {\r
        }\r
                \r
index d36148ce1aa9aeafab9375354370c4d8f04d0a62..f2ad450ed78afc723b2fedc47742ec5dd8bf0881 100644 (file)
 
 #include "monitor.h"
 
-namespace caspar { namespace core {
-       
-}}
\ No newline at end of file
+namespace caspar { namespace core { namespace monitor {
+
+/*class in_callers_thread_schedule_group : public Concurrency::ScheduleGroup
+{
+       virtual void ScheduleTask(Concurrency::TaskProc proc, void* data) override
+       {
+               proc(data);
+       }
+
+    virtual unsigned int Id() const override
+       {
+               return 1;
+       }
+
+    virtual unsigned int Reference() override
+       {
+               return 1;
+       }
+
+    virtual unsigned int Release() override
+       {
+               return 1;
+       }
+};
+
+Concurrency::ScheduleGroup& get_in_callers_thread_schedule_group()
+{
+       static in_callers_thread_schedule_group group;
+
+       return group;
+}*/
+
+}}}
index a8acb0cd909f8b425897102a1f0e39670841d76e..ae33944dcfaf925872dabb461ae49161b89c7b07 100644 (file)
@@ -30,8 +30,6 @@
 #include <string>
 #include <vector>
 
-#include <agents.h>
-
 namespace caspar { namespace core { namespace monitor {
                
 typedef boost::variant<bool, 
@@ -88,26 +86,49 @@ private:
        safe_ptr<std::vector<data_t>>   data_ptr_;
 };
 
-class subject : public Concurrency::transformer<monitor::message, monitor::message>
+struct sink
+{
+       virtual ~sink() { }
+
+       virtual void propagate(const message& msg) = 0;
+};
+
+class subject : public sink
 {
+private:
+       std::weak_ptr<sink> parent_;
+       const std::string path_;
 public:
        subject(std::string path = "")
-               : Concurrency::transformer<monitor::message, monitor::message>([=](const message& msg)
-               {
-                       return msg.propagate(path);
-               })
+               : path_(std::move(path))
        {
                CASPAR_ASSERT(path.empty() || path[0] == '/');
        }
 
-       template<typename T>
-       subject& operator<<(T&& msg)
+       void attach_parent(const safe_ptr<sink>& parent)
+       {
+               parent_ = parent;
+       }
+
+       void detach_parent()
        {
-               Concurrency::send(*this, std::forward<T>(msg));
+               parent_.reset();
+       }
+
+       subject& operator<<(const message& msg)
+       {
+               propagate(msg);
+
                return *this;
        }
-};
 
-typedef Concurrency::ISource<monitor::message> source;
+       virtual void propagate(const message& msg) override
+       {
+               auto parent = parent_.lock();
+
+               if (parent)
+                       parent->propagate(msg.propagate(path_));
+       }
+};
 
-}}}
\ No newline at end of file
+}}}
index 98d845ca0880f5986a5199cfe5138e79ac26418c..6b4d1f6f24a39567f93aa7b579c8ce5b3a33263c 100644 (file)
@@ -221,7 +221,7 @@ public:
                return info;\r
        }\r
 \r
-       monitor::source& monitor_output() \r
+       monitor::subject& monitor_output() \r
        {\r
                return monitor_subject_;\r
        }\r
index 0de36e0c737ea596255ede83e72e269f8c9402d0..d7f14fd14ae4d7a9835621ca4261d84f95af05be 100644 (file)
@@ -79,7 +79,7 @@ public:
                return info;\r
        }\r
 \r
-       monitor::source& monitor_output()\r
+       monitor::subject& monitor_output()\r
        {\r
                return monitor_subject_;\r
        }\r
index b2dce74eca8beb640be806130713cebe831249b6..7abaf1401b34e961bd3b3f7c2df5ba7dbdd0e818 100644 (file)
@@ -139,7 +139,7 @@ public:
        virtual safe_ptr<frame_producer>                                                        get_following_producer() const override                                                                 {return (*producer_)->get_following_producer();}\r
        virtual void                                                                                            set_leading_producer(const safe_ptr<frame_producer>& producer) override {(*producer_)->set_leading_producer(producer);}\r
        virtual uint32_t                                                                                        nb_frames() const override                                                                                              {return (*producer_)->nb_frames();}\r
-       virtual monitor::source&                                                                        monitor_output()                                                                                                                {return (*producer_)->monitor_output();}\r
+       virtual monitor::subject&                                                                       monitor_output()                                                                                                                {return (*producer_)->monitor_output();}\r
 };\r
 \r
 safe_ptr<core::frame_producer> create_producer_destroy_proxy(safe_ptr<core::frame_producer> producer)\r
@@ -174,7 +174,7 @@ public:
        virtual safe_ptr<frame_producer>                                                        get_following_producer() const override                                                                 {return (producer_)->get_following_producer();}\r
        virtual void                                                                                            set_leading_producer(const safe_ptr<frame_producer>& producer) override {(producer_)->set_leading_producer(producer);}\r
        virtual uint32_t                                                                                        nb_frames() const override                                                                                              {return (producer_)->nb_frames();}\r
-       virtual monitor::source&                                                                        monitor_output()                                                                                                                {return (producer_)->monitor_output();}\r
+       virtual monitor::subject&                                                                       monitor_output()                                                                                                                {return (producer_)->monitor_output();}\r
 };\r
 \r
 safe_ptr<core::frame_producer> create_producer_print_proxy(safe_ptr<core::frame_producer> producer)\r
@@ -206,7 +206,7 @@ public:
                info.add(L"type", L"last-frame-producer");\r
                return info;\r
        }\r
-       virtual monitor::source& monitor_output()\r
+       virtual monitor::subject& monitor_output()\r
        {\r
                static monitor::subject monitor_subject("");\r
                return monitor_subject;\r
@@ -228,7 +228,7 @@ struct empty_frame_producer : public frame_producer
                return info;\r
        }\r
 \r
-       virtual monitor::source& monitor_output()\r
+       virtual monitor::subject& monitor_output()\r
        {\r
                static monitor::subject monitor_subject("");\r
                return monitor_subject;\r
index 1e07ecde6390fd801e42c9b816b0d7ef0ab07f3d..c7d62b35be5fd8489259777f4121db0cab2c9818 100644 (file)
@@ -79,7 +79,7 @@ public:
 \r
        static const safe_ptr<frame_producer>& empty(); // nothrow\r
 \r
-       virtual monitor::source& monitor_output() = 0;\r
+       virtual monitor::subject& monitor_output() = 0;\r
 };\r
 \r
 safe_ptr<basic_frame> receive_and_follow(safe_ptr<frame_producer>& producer, int hints);\r
index 71dd72bca44c101fe6e6f2a02b8d5ab2e8512a5c..38a1e146e885cf077d8c18db6f756d58f4ea874b 100644 (file)
@@ -38,7 +38,7 @@ struct layer::implementation
        int32_t                                         auto_play_delta_;\r
        bool                                            is_paused_;\r
        int64_t                                         current_frame_age_;\r
-       monitor::subject                        monitor_subject_;\r
+       safe_ptr<monitor::subject>      monitor_subject_;\r
 \r
 public:\r
        implementation(int index) \r
@@ -47,7 +47,7 @@ public:
                , frame_number_(0)\r
                , auto_play_delta_(-1)\r
                , is_paused_(false)\r
-               , monitor_subject_("/layer/" + boost::lexical_cast<std::string>(index))\r
+               , monitor_subject_(make_safe<monitor::subject>("/layer/" + boost::lexical_cast<std::string>(index)))\r
        {\r
        }\r
        \r
@@ -108,7 +108,7 @@ public:
        {               \r
                try\r
                {\r
-                       monitor_subject_ << monitor::message("/paused") % is_paused_;\r
+                       *monitor_subject_ << monitor::message("/paused") % is_paused_;\r
 \r
                        if(is_paused_)\r
                        {\r
@@ -184,9 +184,9 @@ public:
 \r
        void set_foreground(safe_ptr<core::frame_producer> producer)\r
        {\r
-               foreground_->monitor_output().unlink_target(&monitor_subject_);\r
+               foreground_->monitor_output().detach_parent();\r
                foreground_                     = producer;\r
-               foreground_->monitor_output().link_target(&monitor_subject_);\r
+               foreground_->monitor_output().attach_parent(monitor_subject_);\r
        }\r
 };\r
 \r
@@ -214,5 +214,5 @@ bool layer::empty() const {return impl_->empty();}
 boost::unique_future<std::wstring> layer::call(bool foreground, const std::wstring& param){return impl_->call(foreground, param);}\r
 boost::property_tree::wptree layer::info() const{return impl_->info();}\r
 boost::property_tree::wptree layer::delay_info() const{return impl_->delay_info();}\r
-monitor::source& layer::monitor_output(){return impl_->monitor_subject_;}\r
+monitor::subject& layer::monitor_output(){return *impl_->monitor_subject_;}\r
 }}
\ No newline at end of file
index 061eb26b1bb3773fbde7e4175c5e42a95cc912ef..2809e7ed3ea4c78b94d2b652f8f49ee6cfae36a4 100644 (file)
@@ -66,7 +66,7 @@ public:
        boost::property_tree::wptree info() const;\r
        boost::property_tree::wptree delay_info() const;\r
        \r
-       monitor::source& monitor_output();\r
+       monitor::subject& monitor_output();\r
 private:\r
        struct implementation;\r
        safe_ptr<implementation> impl_;\r
index b29e191b5c11e9d0cb1e7ec33f481c1bd1f136dc..84a671fd0d1090d65557ccb694e177fd2578d4e4 100644 (file)
@@ -141,7 +141,7 @@ public:
                return info;
        }
 
-       monitor::source& monitor_output() 
+       monitor::subject& monitor_output() 
        {
                return monitor_subject_;
        }
index 1018b822c80ac96d34fa00e2e06ac0d76878eb72..f6fba18ae2957038eebe907442af9b9b3982bb2e 100644 (file)
@@ -33,8 +33,8 @@ namespace caspar { namespace core {
 \r
 struct separated_producer : public frame_producer\r
 {              \r
-       monitor::subject                        monitor_subject_;\r
-       monitor::subject                        key_monitor_subject_;\r
+       safe_ptr<monitor::subject>      monitor_subject_;\r
+       safe_ptr<monitor::subject>      key_monitor_subject_;\r
 \r
        safe_ptr<frame_producer>        fill_producer_;\r
        safe_ptr<frame_producer>        key_producer_;\r
@@ -43,18 +43,17 @@ struct separated_producer : public frame_producer
        safe_ptr<basic_frame>           last_frame_;\r
                \r
        explicit separated_producer(const safe_ptr<frame_producer>& fill, const safe_ptr<frame_producer>& key) \r
-               : monitor_subject_("")\r
-               , key_monitor_subject_("/keyer")\r
+               : key_monitor_subject_(make_safe<monitor::subject>("/keyer"))\r
                , fill_producer_(fill)\r
                , key_producer_(key)\r
                , fill_(core::basic_frame::late())\r
                , key_(core::basic_frame::late())\r
                , last_frame_(core::basic_frame::empty())\r
        {\r
-               key_monitor_subject_.link_target(&monitor_subject_);\r
+               key_monitor_subject_->attach_parent(monitor_subject_);\r
 \r
-               key_producer_->monitor_output().link_target(&key_monitor_subject_);\r
-               fill_producer_->monitor_output().link_target(&monitor_subject_);\r
+               key_producer_->monitor_output().attach_parent(key_monitor_subject_);\r
+               fill_producer_->monitor_output().attach_parent(monitor_subject_);\r
        }\r
 \r
        // frame_producer\r
@@ -128,9 +127,9 @@ struct separated_producer : public frame_producer
                return info;\r
        }\r
 \r
-       monitor::source& monitor_output()\r
+       monitor::subject& monitor_output()\r
        {\r
-               return monitor_subject_;\r
+               return *monitor_subject_;\r
        }\r
 };\r
 \r
index 3dcbc606696c573a896c3397598020b6452a2859..7a83cb4d41b6c1d57305f693a51a4e733dcda112 100644 (file)
@@ -103,7 +103,7 @@ struct stage::implementation : public std::enable_shared_from_this<implementatio
        // map of layer -> map of tokens (src ref) -> layer_consumer\r
        std::map<int, std::map<void*, std::shared_ptr<write_frame_consumer>>>            layer_consumers_;\r
        \r
-       monitor::subject                                                                                                                         monitor_subject_;\r
+       safe_ptr<monitor::subject>                                                                                                       monitor_subject_;\r
 \r
        executor                                                                                                                                         executor_;\r
 \r
@@ -112,7 +112,7 @@ public:
                : graph_(graph)\r
                , format_desc_(format_desc)\r
                , target_(target)\r
-               , monitor_subject_("/stage")\r
+               , monitor_subject_(make_safe<monitor::subject>("/stage"))\r
                , executor_(L"stage")\r
        {\r
                graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f, 0.8));      \r
@@ -280,7 +280,7 @@ public:
                if(it == std::end(layers_))\r
                {\r
                        it = layers_.insert(std::make_pair(index, std::make_shared<layer>(index))).first;\r
-                       it->second->monitor_output().link_target(&monitor_subject_);\r
+                       it->second->monitor_output().attach_parent(monitor_subject_);\r
                }\r
                return *it->second;\r
        }\r
@@ -354,18 +354,18 @@ public:
                        auto other_layers       = other_impl->layers_ | boost::adaptors::map_values;\r
 \r
                        BOOST_FOREACH(auto& layer, layers)\r
-                               layer->monitor_output().unlink_target(&monitor_subject_);\r
+                               layer->monitor_output().detach_parent();\r
                        \r
                        BOOST_FOREACH(auto& layer, other_layers)\r
-                               layer->monitor_output().unlink_target(&monitor_subject_);\r
+                               layer->monitor_output().attach_parent(monitor_subject_);\r
                        \r
                        std::swap(layers_, other_impl->layers_);\r
                                                \r
                        BOOST_FOREACH(auto& layer, layers)\r
-                               layer->monitor_output().link_target(&monitor_subject_);\r
+                               layer->monitor_output().detach_parent();\r
                        \r
                        BOOST_FOREACH(auto& layer, other_layers)\r
-                               layer->monitor_output().link_target(&monitor_subject_);\r
+                               layer->monitor_output().detach_parent();\r
                };              \r
 \r
                executor_.begin_invoke([=]\r
@@ -395,13 +395,13 @@ public:
                                auto& my_layer          = get_layer(index);\r
                                auto& other_layer       = other_impl->get_layer(other_index);\r
 \r
-                               my_layer.monitor_output().unlink_target(&monitor_subject_);\r
-                               other_layer.monitor_output().unlink_target(&other_impl->monitor_subject_);\r
+                               my_layer.monitor_output().detach_parent();\r
+                               other_layer.monitor_output().attach_parent(other_impl->monitor_subject_);\r
 \r
                                std::swap(my_layer, other_layer);\r
 \r
-                               my_layer.monitor_output().link_target(&monitor_subject_);\r
-                               other_layer.monitor_output().link_target(&other_impl->monitor_subject_);\r
+                               my_layer.monitor_output().detach_parent();\r
+                               other_layer.monitor_output().attach_parent(other_impl->monitor_subject_);\r
                        };              \r
 \r
                        executor_.begin_invoke([=]\r
@@ -503,5 +503,5 @@ boost::unique_future<boost::property_tree::wptree> stage::info() const{return im
 boost::unique_future<boost::property_tree::wptree> stage::info(int index) const{return impl_->info(index);}\r
 boost::unique_future<boost::property_tree::wptree> stage::delay_info() const{return impl_->delay_info();}\r
 boost::unique_future<boost::property_tree::wptree> stage::delay_info(int index) const{return impl_->delay_info(index);}\r
-monitor::source& stage::monitor_output(){return impl_->monitor_subject_;}\r
+monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;}\r
 }}
\ No newline at end of file
index ec1ecbd387771d4427a3beb6a4a81bfbebbd7d13..014b4f0f19752bd5f3302ea53d1e6e692a7f760e 100644 (file)
@@ -93,7 +93,7 @@ public:
        \r
        void set_video_format_desc(const video_format_desc& format_desc);\r
                \r
-       monitor::source& monitor_output();\r
+       monitor::subject& monitor_output();\r
 \r
 private:\r
        struct implementation;\r
index a8b92effd2e79016e9369eb3ca5ace842fb83e8d..759d48079c66854072d2c2eee101fb7490ee274e 100644 (file)
@@ -38,7 +38,7 @@ namespace caspar { namespace core {
 \r
 struct transition_producer : public frame_producer\r
 {      \r
-       monitor::subject                        monitor_subject_;\r
+       safe_ptr<monitor::subject>      monitor_subject_;\r
 \r
        const field_mode::type          mode_;\r
        unsigned int                            current_frame_;\r
@@ -58,7 +58,7 @@ struct transition_producer : public frame_producer
                , source_producer_(frame_producer::empty())\r
                , last_frame_(basic_frame::empty())\r
        {\r
-               dest->monitor_output().link_target(&monitor_subject_);\r
+               dest->monitor_output().attach_parent(monitor_subject_);\r
        }\r
        \r
        // frame_producer\r
@@ -100,8 +100,8 @@ struct transition_producer : public frame_producer
                                source = source_producer_->last_frame();\r
                });\r
 \r
-               monitor_subject_ << monitor::message("/transition/frame") % static_cast<std::int32_t>(current_frame_) % static_cast<std::int32_t>(info_.duration)\r
-                                                << monitor::message("/transition/type") % [&]() -> std::string\r
+               *monitor_subject_ << monitor::message("/transition/frame") % static_cast<std::int32_t>(current_frame_) % static_cast<std::int32_t>(info_.duration)\r
+                                 << monitor::message("/transition/type") % [&]() -> std::string\r
                                                                                                                                {\r
                                                                                                                                        switch(info_.type)\r
                                                                                                                                        {\r
@@ -206,9 +206,9 @@ struct transition_producer : public frame_producer
                return basic_frame::combine(s_frame, d_frame);\r
        }\r
 \r
-       monitor::source& monitor_output()\r
+       monitor::subject& monitor_output()\r
        {\r
-               return monitor_subject_;\r
+               return *monitor_subject_;\r
        }\r
 };\r
 \r
index 3ec1156a2bb02192b533a8f06d3865b1fa97f037..70e9ec3ef48018e7bdeaa118e382e4231ba25c68 100644 (file)
@@ -52,7 +52,7 @@ struct video_channel::implementation : boost::noncopyable
        const safe_ptr<caspar::core::mixer>             mixer_;\r
        const safe_ptr<caspar::core::stage>             stage_;\r
 \r
-       monitor::subject                                                monitor_subject_;\r
+       safe_ptr<monitor::subject>                              monitor_subject_;\r
        \r
 public:\r
        implementation(video_channel& self, int index, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl, const channel_layout& audio_channel_layout)  \r
@@ -63,7 +63,7 @@ public:
                , output_(new caspar::core::output(graph_, format_desc, index))\r
                , mixer_(new caspar::core::mixer(graph_, output_, format_desc, ogl, audio_channel_layout))\r
                , stage_(new caspar::core::stage(graph_, mixer_, format_desc))  \r
-               , monitor_subject_("/channel/" + boost::lexical_cast<std::string>(index))\r
+               , monitor_subject_(make_safe<monitor::subject>("/channel/" + boost::lexical_cast<std::string>(index)))\r
        {\r
                graph_->set_text(print());\r
                diagnostics::register_graph(graph_);\r
@@ -71,7 +71,7 @@ public:
                for(int n = 0; n < std::max(1, env::properties().get(L"configuration.pipeline-tokens", 2)); ++n)\r
                        stage_->spawn_token();\r
 \r
-               stage_->monitor_output().link_target(&monitor_subject_);\r
+               stage_->monitor_output().attach_parent(monitor_subject_);\r
 \r
                CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
        }\r
@@ -155,6 +155,6 @@ video_format_desc video_channel::get_video_format_desc() const{return impl_->for
 void video_channel::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
 boost::property_tree::wptree video_channel::info() const{return impl_->info();}\r
 int video_channel::index() const {return impl_->index_;}\r
-monitor::source& video_channel::monitor_output(){return impl_->monitor_subject_;}\r
+monitor::subject& video_channel::monitor_output(){return *impl_->monitor_subject_;}\r
 boost::property_tree::wptree video_channel::delay_info() const { return impl_->delay_info(); }\r
 }}
\ No newline at end of file
index 984ed5608f4a0af36a7e89645e918fd7fd785d8d..2938b0528cd036c952004e07d09ad74d1e13171d 100644 (file)
@@ -65,7 +65,7 @@ public:
 \r
        int index() const;\r
        \r
-       monitor::source& monitor_output();\r
+       monitor::subject& monitor_output();\r
 \r
 private:\r
        struct implementation;\r
index 02a98663d0f95970cd00c6cbeb790eca932c8f3a..0a3c64625ff873c9b9d4fdc13bcddde14570ada8 100644 (file)
@@ -325,7 +325,7 @@ public:
                return model_name_ + L" [" + boost::lexical_cast<std::wstring>(device_index_) + L"|" + format_desc_.name + L"]";\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return monitor_subject_;\r
        }\r
@@ -386,7 +386,7 @@ public:
                return info;\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return context_->monitor_output();\r
        }\r
index 07ef0cdb61af022527503599c70f9a7e8e093e6f..ebee33313a0f7045eac3f8428f52d071f1eecd51 100644 (file)
@@ -489,7 +489,7 @@ public:
                        frame_buffer_.push(std::make_pair(make_safe_ptr(frame), file_frame_number));\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return monitor_subject_;\r
        }\r
index 40e35dcc69c9beb46e4b90327b04d576251b742a..2c2a55634181067d6c65ac107909df9b43aa394d 100644 (file)
@@ -674,12 +674,8 @@ void STDMETHODCALLTYPE FlashAxContainer::OnFlashCall(BSTR request)
 \r
 void STDMETHODCALLTYPE FlashAxContainer::OnReadyStateChange(long newState)\r
 {\r
-       if(newState == 4)\r
-       {\r
+       if (newState == 4)\r
                bReadyToRender_ = true;\r
-       }\r
-       else\r
-               bReadyToRender_ = false;\r
 }\r
 \r
 void FlashAxContainer::DestroyAxControl()\r
index 35e7c3926d8f681ee1a7b84a1af3bc280366b02b..b0acbebc827f87594532efda0d6eb26b69aa9bfe 100644 (file)
@@ -193,18 +193,20 @@ public:
                return L"";\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return flash_producer_->monitor_output();\r
        }\r
 };\r
-       \r
-safe_ptr<cg_producer> get_default_cg_producer(\r
+\r
+void with_default_cg_producer(\r
+               std::function<void (safe_ptr<cg_producer>)> command,\r
                const safe_ptr<core::video_channel>& video_channel,\r
                bool expect_existing,\r
-               int render_layer)\r
-{      \r
-       auto flash_producer = video_channel->stage()->foreground(render_layer).get();\r
+               int layer_index)\r
+{\r
+       auto flash_producer = video_channel->stage()->foreground(layer_index).get();\r
+       bool was_created = false;\r
 \r
        try\r
        {\r
@@ -213,17 +215,17 @@ safe_ptr<cg_producer> get_default_cg_producer(
                        if (expect_existing)\r
                                BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(\r
                                                "No flash producer on layer "\r
-                                               + boost::lexical_cast<std::string>(render_layer)));\r
+                                               + boost::lexical_cast<std::string>(layer_index)));\r
 \r
-                       flash_producer = flash::create_producer(video_channel->mixer(), boost::assign::list_of<std::wstring>());        \r
-                       video_channel->stage()->load(render_layer, flash_producer); \r
-                       video_channel->stage()->play(render_layer);\r
+                       flash_producer = flash::create_producer(video_channel->mixer(), boost::assign::list_of<std::wstring>());\r
                }\r
 \r
                if (expect_existing && flash_producer->call(L"?").get() == L"0")\r
                        BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(\r
                                        "No flash player on layer "\r
-                                       + boost::lexical_cast<std::string>(render_layer)));\r
+                                       + boost::lexical_cast<std::string>(layer_index)));\r
+\r
+               was_created = true;\r
        }\r
        catch(...)\r
        {\r
@@ -231,7 +233,29 @@ safe_ptr<cg_producer> get_default_cg_producer(
                throw;\r
        }\r
 \r
-       return make_safe<cg_producer>(flash_producer);\r
+       command(make_safe<cg_producer>(flash_producer));\r
+\r
+       if (was_created)\r
+       {\r
+               video_channel->stage()->load(layer_index, flash_producer); \r
+               video_channel->stage()->play(layer_index);\r
+       }\r
+}\r
+       \r
+safe_ptr<cg_producer> get_default_cg_producer(\r
+               const safe_ptr<core::video_channel>& video_channel,\r
+               bool expect_existing,\r
+               int render_layer)\r
+{      \r
+       std::shared_ptr<cg_producer> producer;\r
+\r
+       with_default_cg_producer(\r
+                       [&producer](safe_ptr<cg_producer> p)\r
+                       {\r
+                               producer = p;\r
+                       }, video_channel, expect_existing, render_layer);\r
+\r
+       return make_safe_ptr(producer);\r
 }\r
 \r
 safe_ptr<core::frame_producer> create_cg_producer_and_autoplay_file(\r
@@ -274,7 +298,7 @@ cg_producer::cg_producer(const safe_ptr<core::frame_producer>& frame_producer) :
 cg_producer::cg_producer(cg_producer&& other) : impl_(std::move(other.impl_)){}\r
 safe_ptr<core::basic_frame> cg_producer::receive(int hints){return impl_->receive(hints);}\r
 safe_ptr<core::basic_frame> cg_producer::last_frame() const{return impl_->last_frame();}\r
-void cg_producer::add(int layer, const std::wstring& template_name,  bool play_on_load, const std::wstring& startFromLabel, const std::wstring& data){impl_->add(layer, template_name, play_on_load, startFromLabel, data);}\r
+void cg_producer::add(int layer, const std::wstring& template_name,  bool play_on_load, const std::wstring& startFromLabel, const std::wstring& data){impl_->add(layer, template_name, play_on_load, startFromLabel, data).wait();}\r
 void cg_producer::remove(int layer){impl_->remove(layer);}\r
 void cg_producer::play(int layer){impl_->play(layer);}\r
 void cg_producer::stop(int layer, unsigned int mix_out_duration){impl_->stop(layer, mix_out_duration);}\r
@@ -286,5 +310,5 @@ std::wstring cg_producer::invoke(int layer, const std::wstring& label){return im
 std::wstring cg_producer::description(int layer){return impl_->timed_description(layer);}\r
 std::wstring cg_producer::template_host_info(){return impl_->timed_template_host_info();}\r
 boost::property_tree::wptree cg_producer::info() const{return impl_->info();}\r
-core::monitor::source& cg_producer::monitor_output(){return impl_->monitor_output();}\r
+core::monitor::subject& cg_producer::monitor_output(){return impl_->monitor_output();}\r
 }}
\ No newline at end of file
index b43bdb2d84f0bf202254a463b4d597921c8ed02d..3998499c6589b16b27dc06c017995d18d328b378 100644 (file)
@@ -29,6 +29,7 @@
 #include <boost/thread/future.hpp>\r
 \r
 #include <string>\r
+#include <functional>\r
 \r
 namespace caspar {\r
 namespace core {\r
@@ -64,7 +65,7 @@ public:
        std::wstring description(int layer);\r
        std::wstring template_host_info();\r
 \r
-       core::monitor::source& monitor_output();\r
+       core::monitor::subject& monitor_output();\r
 \r
 private:\r
        struct implementation;\r
@@ -72,6 +73,8 @@ private:
 };\r
 safe_ptr<cg_producer> get_default_cg_producer(const safe_ptr<core::video_channel>& video_channel, bool expect_existing, int layer_index = cg_producer::DEFAULT_LAYER);\r
 \r
+void with_default_cg_producer(std::function<void (safe_ptr<cg_producer>)> command, const safe_ptr<core::video_channel>& video_channel, bool expect_existing, int layer_index = cg_producer::DEFAULT_LAYER);\r
+\r
 safe_ptr<core::frame_producer> create_ct_producer(\r
                const safe_ptr<core::frame_factory>& frame_factory,\r
                const core::parameters& params);\r
index 54ccb65e559414d136c0c68559604fbb58c9aed9..3839df0df494ae5293a7b708789b64e9e81a002a 100644 (file)
@@ -151,6 +151,13 @@ template_host get_template_host(const core::video_format_desc& desc)
        return template_host;\r
 }\r
 \r
+boost::mutex& get_global_init_destruct_mutex()\r
+{\r
+       static boost::mutex m;\r
+\r
+       return m;\r
+}\r
+\r
 class flash_renderer\r
 {      \r
        struct com_init\r
@@ -183,8 +190,6 @@ class flash_renderer
        \r
        boost::timer                                                                    frame_timer_;\r
        boost::timer                                                                    tick_timer_;\r
-\r
-       high_prec_timer                                                                 timer_;\r
        \r
 public:\r
        flash_renderer(const safe_ptr<diagnostics::graph>& graph, const std::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height) \r
@@ -194,21 +199,27 @@ public:
                , filename_(filename)\r
                , frame_factory_(frame_factory)\r
                , ax_(nullptr)\r
-               , head_(core::basic_frame::empty())\r
+               , head_(core::basic_frame::late())\r
                , bmp_(width, height)\r
        {               \r
                graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
                graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));\r
-               graph_->set_color("param", diagnostics::color(1.0f, 0.5f, 0.0f));       \r
-               graph_->set_color("sync", diagnostics::color(0.8f, 0.3f, 0.2f));                        \r
-               \r
-               if(FAILED(CComObject<caspar::flash::FlashAxContainer>::CreateInstance(&ax_)))\r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to create FlashAxContainer"));\r
+               graph_->set_color("param", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+               graph_->set_color("buffered", diagnostics::color(0.8f, 0.3f, 0.2f));\r
+\r
+               lock(get_global_init_destruct_mutex(), [this]\r
+               {\r
+\r
+                       CoInitialize(nullptr);\r
+\r
+                       if(FAILED(CComObject<caspar::flash::FlashAxContainer>::CreateInstance(&ax_)))\r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to create FlashAxContainer"));\r
                \r
-               ax_->set_print([this]{return print();});\r
+                       if(FAILED(ax_->CreateAxControl()))\r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to Create FlashAxControl"));\r
+               });\r
 \r
-               if(FAILED(ax_->CreateAxControl()))\r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to Create FlashAxControl"));\r
+               ax_->set_print([this]{return print();});\r
                \r
                CComPtr<IShockwaveFlash> spFlash;\r
                if(FAILED(ax_->QueryControl(&spFlash)))\r
@@ -224,7 +235,7 @@ public:
                        BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to Set Scale Mode"));\r
                                                \r
                ax_->SetSize(width_, height_);          \r
-               render_frame(false);\r
+               render_frame();\r
        \r
                CASPAR_LOG(info) << print() << L" Initialized.";\r
        }\r
@@ -233,8 +244,11 @@ public:
        {               \r
                if(ax_)\r
                {\r
-                       ax_->DestroyAxControl();\r
-                       ax_->Release();\r
+                       lock(get_global_init_destruct_mutex(), [this]\r
+                       {\r
+                               ax_->DestroyAxControl();\r
+                               ax_->Release();\r
+                       });\r
                }\r
                graph_->set_value("tick-time", 0.0f);\r
                graph_->set_value("frame-time", 0.0f);\r
@@ -249,28 +263,24 @@ public:
 \r
                if(!ax_->FlashCall(param, result))\r
                        CASPAR_LOG(warning) << print() << L" Flash call failed:" << param;//BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Flash function call failed.") << arg_name_info("param") << arg_value_info(narrow(param)));\r
-               graph_->set_tag("param");\r
+               graph_->set_tag("param");\r
 \r
                return result;\r
        }\r
        \r
-       safe_ptr<core::basic_frame> render_frame(double sync)\r
+       safe_ptr<core::basic_frame> render_frame()\r
        {\r
                float frame_time = 1.0f/ax_->GetFPS();\r
 \r
                graph_->set_value("tick-time", static_cast<float>(tick_timer_.elapsed()/frame_time)*0.5f);\r
                tick_timer_.restart();\r
 \r
+               if (!ax_->IsReadyToRender())\r
+                       return head_;\r
+\r
                if(ax_->IsEmpty())\r
-                       return core::basic_frame::empty();              \r
+                       return core::basic_frame::empty();\r
                \r
-               if(sync > 0.00001)                      \r
-                       timer_.tick(frame_time*sync); // This will block the thread.\r
-               else\r
-                       graph_->set_tag("sync");\r
-\r
-               graph_->set_value("sync", sync);\r
-                       \r
                frame_timer_.restart();\r
 \r
                ax_->Tick();\r
@@ -341,7 +351,6 @@ struct flash_producer : public core::frame_producer
        std::queue<safe_ptr<core::basic_frame>>                                         frame_buffer_;\r
        tbb::concurrent_bounded_queue<safe_ptr<core::basic_frame>>      output_buffer_;\r
        \r
-       mutable tbb::spin_mutex                                                                         last_frame_mutex_;\r
        safe_ptr<core::basic_frame>                                                                     last_frame_;\r
                \r
        std::unique_ptr<flash_renderer>                                                         renderer_;\r
@@ -364,11 +373,7 @@ public:
                graph_->set_text(print());\r
                diagnostics::register_graph(graph_);\r
                \r
-               renderer_.reset(new flash_renderer(graph_, frame_factory_, filename_, width_, height_));\r
-               has_renderer_ = true;\r
-\r
-               while(output_buffer_.size() < buffer_size_)\r
-                       output_buffer_.push(core::basic_frame::empty());\r
+               has_renderer_ = false;\r
        }\r
 \r
        ~flash_producer()\r
@@ -385,10 +390,16 @@ public:
        {                                       \r
                auto frame = core::basic_frame::late();\r
                \r
-               if(output_buffer_.try_pop(frame))       \r
-                       next();\r
+               double buffered = output_buffer_.size();\r
+               auto ratio = buffered / buffer_size_;\r
+               graph_->set_value("buffered", ratio);\r
+\r
+               if(output_buffer_.try_pop(frame))\r
+                       last_frame_ = frame;\r
                else\r
                        graph_->set_tag("late-frame");\r
+\r
+               fill_buffer();\r
                \r
                monitor_subject_ << core::monitor::message("/host/path")                % filename_\r
                                             << core::monitor::message("/host/width")   % width_\r
@@ -401,10 +412,7 @@ public:
 \r
        virtual safe_ptr<core::basic_frame> last_frame() const override\r
        {\r
-               return lock(last_frame_mutex_, [this]\r
-               {\r
-                       return last_frame_;\r
-               });\r
+               return last_frame_;\r
        }               \r
        \r
        virtual boost::unique_future<std::wstring> call(const std::wstring& param) override\r
@@ -416,18 +424,24 @@ public:
                {\r
                        try\r
                        {\r
-                               if(!renderer_)\r
+                               bool initialize_renderer = !renderer_;\r
+\r
+                               if(initialize_renderer)\r
                                {\r
                                        renderer_.reset(new flash_renderer(graph_, frame_factory_, filename_, width_, height_));\r
 \r
-                                       while(output_buffer_.size() < buffer_size_)\r
-                                               output_buffer_.push(core::basic_frame::empty());\r
-\r
                                        has_renderer_ = true;\r
                                }\r
 \r
-                               return renderer_->call(param);  \r
+                               std::wstring result = param == L"start_rendering"\r
+                                               ? L"" : renderer_->call(param);\r
 \r
+                               if (initialize_renderer)\r
+                               {\r
+                                       do_fill_buffer();\r
+                               }\r
+\r
+                               return result;\r
                                //const auto& format_desc = frame_factory_->get_video_format_desc();\r
                                //if(abs(context_->fps() - format_desc.fps) > 0.01 && abs(context_->fps()/2.0 - format_desc.fps) > 0.01)\r
                                //      CASPAR_LOG(warning) << print() << " Invalid frame-rate: " << context_->fps() << L". Should be either " << format_desc.fps << L" or " << format_desc.fps*2.0 << L".";\r
@@ -440,7 +454,7 @@ public:
                        }\r
 \r
                        return L"";\r
-               });\r
+               }, high_priority);\r
        }\r
                \r
        virtual std::wstring print() const override\r
@@ -456,65 +470,112 @@ public:
        }\r
 \r
        // flash_producer\r
-       \r
-       void next()\r
-       {       \r
+\r
+       void fill_buffer()\r
+       {\r
                executor_.begin_invoke([this]\r
                {\r
-                       if(!renderer_)\r
-                               frame_buffer_.push(core::basic_frame::empty());\r
+                       do_fill_buffer();\r
+               });\r
+       }\r
+\r
+       void do_fill_buffer()\r
+       {\r
+               int nothing_rendered = 0;\r
+               const int MAX_NOTHING_RENDERED_RETRIES = 4;\r
+\r
+               auto to_render = buffer_size_ - output_buffer_.size();\r
+               int rendered = 0;\r
+\r
+               while (rendered < to_render)\r
+               {\r
+                       bool was_rendered = next();\r
 \r
-                       if(frame_buffer_.empty())\r
+                       if (was_rendered)\r
                        {\r
-                               auto format_desc = frame_factory_->get_video_format_desc();\r
+                               ++rendered;\r
+                       }\r
+                       else\r
+                       {\r
+                               if (nothing_rendered++ < MAX_NOTHING_RENDERED_RETRIES)\r
+                               {\r
+                                       // Flash player not ready with first frame, sleep to not busy-loop;\r
+                                       boost::this_thread::sleep(boost::posix_time::milliseconds(10));\r
+                                       boost::this_thread::yield();\r
+                               }\r
+                               else\r
+                                       return;\r
+                       }\r
+\r
+                       executor_.yield();\r
+               }\r
+       }\r
+       \r
+       bool next()\r
+       {       \r
+               if(!renderer_)\r
+                       frame_buffer_.push(core::basic_frame::empty());\r
+\r
+               if(frame_buffer_.empty())\r
+               {\r
+                       auto format_desc = frame_factory_->get_video_format_desc();\r
                                        \r
-                               if(abs(renderer_->fps()/2.0 - format_desc.fps) < 2.0) // flash == 2 * format -> interlace\r
+                       if(abs(renderer_->fps()/2.0 - format_desc.fps) < 2.0) // flash == 2 * format -> interlace\r
+                       {\r
+                               auto frame1 = render_frame();\r
+\r
+                               if (frame1 != core::basic_frame::late())\r
                                {\r
-                                       auto frame1 = render_frame();\r
                                        auto frame2 = render_frame();\r
                                        frame_buffer_.push(core::basic_frame::interlace(frame1, frame2, format_desc.field_mode));\r
                                }\r
-                               else if(abs(renderer_->fps() - format_desc.fps/2.0) < 2.0) // format == 2 * flash -> duplicate\r
+                       }\r
+                       else if(abs(renderer_->fps() - format_desc.fps/2.0) < 2.0) // format == 2 * flash -> duplicate\r
+                       {\r
+                               auto frame = render_frame();\r
+\r
+                               if (frame != core::basic_frame::late())\r
                                {\r
-                                       auto frame = render_frame();\r
                                        frame_buffer_.push(frame);\r
                                        frame_buffer_.push(frame);\r
                                }\r
-                               else //if(abs(renderer_->fps() - format_desc_.fps) < 0.1) // format == flash -> simple\r
-                               {\r
-                                       auto frame = render_frame();\r
+                       }\r
+                       else //if(abs(renderer_->fps() - format_desc_.fps) < 0.1) // format == flash -> simple\r
+                       {\r
+                               auto frame = render_frame();\r
+\r
+                               if (frame != core::basic_frame::late())\r
                                        frame_buffer_.push(frame);\r
-                               }\r
+                       }\r
                                                \r
-                               fps_.fetch_and_store(static_cast<int>(renderer_->fps()*100.0));                         \r
-                               graph_->set_text(print());\r
+                       fps_.fetch_and_store(static_cast<int>(renderer_->fps()*100.0));                         \r
+                       graph_->set_text(print());\r
                        \r
-                               if(renderer_->is_empty())\r
-                               {\r
-                                       renderer_.reset();\r
-                                       has_renderer_ = false;\r
-                               }\r
+                       if(renderer_->is_empty())\r
+                       {\r
+                               renderer_.reset();\r
+                               has_renderer_ = false;\r
                        }\r
+               }\r
 \r
+               if (frame_buffer_.empty())\r
+               {\r
+                       return false;\r
+               }\r
+               else\r
+               {\r
                        output_buffer_.push(std::move(frame_buffer_.front()));\r
                        frame_buffer_.pop();\r
-               });\r
+                       return true;\r
+               }\r
        }\r
 \r
        safe_ptr<core::basic_frame> render_frame()\r
        {       \r
-               double ratio = std::min(1.0, static_cast<double>(output_buffer_.size())/static_cast<double>(std::max(1, buffer_size_ - 1)));\r
-               double sync  = 2*ratio - ratio*ratio;\r
-\r
-               auto frame = renderer_->render_frame(sync);\r
-               lock(last_frame_mutex_, [&]\r
-               {\r
-                       last_frame_ = frame;\r
-               });\r
-               return frame;\r
+               return renderer_->render_frame();\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return monitor_subject_;\r
        }\r
@@ -545,9 +606,12 @@ safe_ptr<core::frame_producer> create_swf_producer(
 \r
        swf_t::header_t header(filename);\r
 \r
-       return create_producer_destroy_proxy(\r
-                  create_producer_print_proxy(\r
-                       make_safe<flash_producer>(frame_factory, filename, header.frame_width, header.frame_height)));\r
+       auto producer = make_safe<flash_producer>(\r
+                       frame_factory, filename, header.frame_width, header.frame_height);\r
+\r
+       producer->call(L"start_rendering").get();\r
+\r
+       return create_producer_destroy_proxy(create_producer_print_proxy(producer));\r
 }\r
 \r
 std::wstring find_template(const std::wstring& template_name)\r
index 8942d17cc9b6dd124c790873740081c9d0e244ed..a7ac767628ef5bfddb15e9cbb27bd5c324996be6 100644 (file)
@@ -114,7 +114,7 @@ struct image_producer : public core::frame_producer
                return info;\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return monitor_subject_;\r
        }\r
index e45ed80644c2d78a3c7c0179e703404359af53e1..7b9804429e87128c7959070722461a6c8e2aee89 100644 (file)
@@ -385,7 +385,7 @@ struct image_scroll_producer : public core::frame_producer
                }\r
        }\r
 \r
-       core::monitor::source& monitor_output()\r
+       core::monitor::subject& monitor_output()\r
        {\r
                return monitor_subject_;\r
        }\r
index 0b7c8de1767b302d358658dcd9334ed0b482fe4c..90a084e6d0fe1cab9e66eae6a99527abf47ef887 100644 (file)
@@ -1343,7 +1343,12 @@ bool CGCommand::DoExecuteAdd() {
                std::wstring filename = _parameters[2];\r
                filename.append(extension);\r
 \r
-               flash::get_default_cg_producer(safe_ptr<core::video_channel>(GetChannel()), false, GetLayerIndex(flash::cg_producer::DEFAULT_LAYER))->add(layer, filename, bDoStart, label, (pDataString!=0) ? pDataString : TEXT(""));\r
+               flash::with_default_cg_producer(\r
+                               [&](safe_ptr<flash::cg_producer> producer)\r
+                               {\r
+                                       producer->add(layer, filename, bDoStart, label, (pDataString!=0) ? pDataString : TEXT(""));\r
+                               },\r
+                               safe_ptr<core::video_channel>(GetChannel()), false, GetLayerIndex(flash::cg_producer::DEFAULT_LAYER));\r
                SetReplyString(TEXT("202 CG OK\r\n"));\r
        }\r
        else\r
index 0077756fb210dafc46f7826f59840d2efa55aa03..d87254c8cafe86dfb78ed776947b942d5d30dec2 100644 (file)
 
 #include "client.h"
 
-#include "oscpack/oscOutboundPacketStream.h"
+#include "oscpack/OscOutboundPacketStream.h"
+#include "oscpack/OscHostEndianness.h"
 
 #include <common/utility/string.h>
 #include <common/exception/win32_exception.h>
+#include <common/memory/endian.h>
+
+#include <core/monitor/monitor.h>
 
 #include <functional>
 #include <vector>
+#include <unordered_map>
 
 #include <boost/asio.hpp>
 #include <boost/foreach.hpp>
 #include <boost/bind.hpp>
+#include <boost/thread.hpp>
 
 #include <tbb/spin_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
 
 using namespace boost::asio::ip;
 
 namespace caspar { namespace protocol { namespace osc {
 
+template<typename T>
+struct no_init_proxy
+{
+    T value;
+
+    no_init_proxy() 
+       {
+               static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
+        static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
+    }
+};
+
+typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+
 template<typename T>
 struct param_visitor : public boost::static_visitor<void>
 {
@@ -64,38 +85,76 @@ struct param_visitor : public boost::static_visitor<void>
        void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
 };
 
-std::vector<char> write_osc_event(const core::monitor::message& e)
-{
-       std::array<char, 4096> buffer;
-       ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
+{              
+       destination.resize(4096);
 
-       o       << ::osc::BeginMessage(e.path().c_str());
+       ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       o << ::osc::BeginMessage(e.path().c_str());
                                
-       param_visitor<decltype(o)> pd_visitor(o);
-       BOOST_FOREACH(auto data, e.data())
-               boost::apply_visitor(pd_visitor, data);
+       param_visitor<decltype(o)> param_visitor(o);
+       BOOST_FOREACH(const auto& data, e.data())
+               boost::apply_visitor(param_visitor, data);
                                
-       o       << ::osc::EndMessage;
+       o << ::osc::EndMessage;
                
-       return std::vector<char>(o.Data(), o.Data() + o.Size());
+       destination.resize(o.Size());
 }
 
-struct client::impl : public std::enable_shared_from_this<client::impl>
+byte_vector write_osc_bundle_start()
 {
-       tbb::spin_mutex                                                         endpoints_mutex_;
-       std::map<udp::endpoint, int>                            reference_counts_by_endpoint_;
-       udp::socket                                                                     socket_;
+       byte_vector destination;
+       destination.resize(16);
+
+       ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       o << ::osc::BeginBundle();
+
+       destination.resize(o.Size());
+
+       return destination;
+}
+
+void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
+{              
+       destination.resize(4);
+
+       int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
 
-       Concurrency::call<core::monitor::message>       on_next_;
+#ifdef OSC_HOST_LITTLE_ENDIAN
+       *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
+#else
+       *bundle_element_size = static_cast<int32_t>(bundle.size());
+#endif
+}
+
+struct client::impl : public std::enable_shared_from_this<client::impl>, core::monitor::sink
+{
+       udp::socket socket_;
+       tbb::spin_mutex                                                                 endpoints_mutex_;
+       std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
+
+       std::unordered_map<std::string, byte_vector>    updates_;
+       boost::mutex                                                                    updates_mutex_;                                                         
+       boost::condition_variable                                               updates_cond_;
+
+       tbb::atomic<bool>                                                               is_running_;
+
+       boost::thread                                                                   thread_;
        
 public:
-       impl(
-                       boost::asio::io_service& service,
-                       Concurrency::ISource<core::monitor::message>& source)
+       impl(boost::asio::io_service& service)
                : socket_(service, udp::v4())
-               , on_next_([this](const core::monitor::message& msg) { on_next(msg); })
+               , thread_(boost::bind(&impl::run, this))
+       {
+       }
+
+       ~impl()
        {
-               source.link_target(&on_next_);
+               is_running_ = false;
+
+               updates_cond_.notify_one();
+
+               thread_.join();
        }
 
        std::shared_ptr<void> get_subscription_token(
@@ -125,46 +184,115 @@ public:
                                self.reference_counts_by_endpoint_.erase(endpoint);
                });
        }
-       
-       void on_next(const core::monitor::message& msg)
+private:
+       void propagate(const core::monitor::message& msg)
        {
-               win32_exception::ensure_handler_installed_for_thread("agents-thread");
-               auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
-
-               tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+               boost::lock_guard<boost::mutex> lock(updates_mutex_);
 
-               BOOST_FOREACH(auto& elem, reference_counts_by_endpoint_)
+               try 
                {
-                       auto& endpoint = elem.first;
-
-                       // TODO: We seem to be lucky here, because according to asio
-                       //       documentation only one async operation can be "in flight"
-                       //       at any given point in time for a socket. This somehow seems
-                       //       to work though in the case of UDP and Windows.
-                       socket_.async_send_to(
-                                       boost::asio::buffer(*data_ptr),
-                                       endpoint,
-                                       boost::bind(
-                                                       &impl::handle_send_to,
-                                                       this,
-                                                       data_ptr, // The data_ptr needs to live
-                                                       boost::asio::placeholders::error,
-                                                       boost::asio::placeholders::bytes_transferred));         
+                       write_osc_event(updates_[msg.path()], msg);
                }
+               catch(...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+                       updates_.erase(msg.path());
+               }
+
+               updates_cond_.notify_one();
+       }
+
+       template<typename T>
+       void do_send(
+                       const T& buffers, const std::vector<udp::endpoint>& destinations)
+       {
+               boost::system::error_code ec;
+
+               BOOST_FOREACH(const auto& endpoint, destinations)
+                       socket_.send_to(buffers, endpoint, 0, ec);
        }
 
-       void handle_send_to(
-                       const safe_ptr<std::vector<char>>& /* sent_buffer */,
-                       const boost::system::error_code& /*error*/,
-                       size_t /*bytes_sent*/)
+       void run()
        {
+               // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
+               const int SAFE_DATAGRAM_SIZE = 508;
+
+               try
+               {
+                       is_running_ = true;
+
+                       std::unordered_map<std::string, byte_vector> updates;
+                       std::vector<udp::endpoint> destinations;
+                       const byte_vector bundle_header = write_osc_bundle_start();
+                       std::vector<byte_vector> element_headers;
+
+                       while (is_running_)
+                       {               
+                               updates.clear();
+                               destinations.clear();
+
+                               {                       
+                                       boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
+
+                                       if (updates_.empty())
+                                               updates_cond_.wait(cond_lock);
+
+                                       std::swap(updates, updates_);
+                               }
+
+                               {
+                                       tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+                                       BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+                                               destinations.push_back(endpoint.first);
+                               }
+
+                               if (destinations.empty())
+                                       continue;
+
+                               std::vector<boost::asio::const_buffers_1> buffers;
+                               element_headers.resize(
+                                               std::max(element_headers.size(), updates.size()));
+
+                               int i = 0;
+                               int datagram_size = bundle_header.size();
+                               buffers.push_back(boost::asio::buffer(bundle_header));
+
+                               BOOST_FOREACH(const auto& slot, updates)
+                               {
+                                       write_osc_bundle_element_start(element_headers[i], slot.second);
+                                       const auto& headers = element_headers;
+
+                                       auto size_of_element = headers[i].size() + slot.second.size();
+       
+                                       if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
+                                       {
+                                               do_send(buffers, destinations);
+                                               buffers.clear();
+                                               buffers.push_back(boost::asio::buffer(bundle_header));
+                                               datagram_size = bundle_header.size();
+                                       }
+
+                                       buffers.push_back(boost::asio::buffer(headers[i]));
+                                       buffers.push_back(boost::asio::buffer(slot.second));
+
+                                       datagram_size += size_of_element;
+                                       ++i;
+                               }
+                       
+                               if (!buffers.empty())
+                                       do_send(buffers, destinations);
+                       }
+               }
+               catch (...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+               }
        }
 };
 
-client::client(
-               boost::asio::io_service& service,
-               Concurrency::ISource<core::monitor::message>& source) 
-       : impl_(new impl(service, source))
+client::client(boost::asio::io_service& service) 
+       : impl_(new impl(service))
 {
 }
 
@@ -189,4 +317,9 @@ std::shared_ptr<void> client::get_subscription_token(
        return impl_->get_subscription_token(endpoint);
 }
 
+safe_ptr<core::monitor::sink> client::sink()
+{
+       return impl_;
+}
+
 }}}
index cf1196fb97772e7cd596e41a0297fa912c1d522d..46e2179ca605f7036efa0ac1f561d470c0ff8014 100644 (file)
@@ -40,9 +40,7 @@ public:
 
        // Constructors
 
-       client(
-                       boost::asio::io_service& service,
-                       Concurrency::ISource<core::monitor::message>& source);
+       client(boost::asio::io_service& service);
        
        client(client&&);
 
@@ -67,9 +65,10 @@ public:
        
        // Properties
 
+       safe_ptr<core::monitor::sink> sink();
 private:
        struct impl;
-       std::shared_ptr<impl> impl_;
+       safe_ptr<impl> impl_;
 };
 
 }}}
index 2d0131c804a7841d0d2735fcc7499176fa1b5ccc..e09bf7430f4567f69c5c2bebbacb3e65aabb2224 100644 (file)
@@ -77,7 +77,7 @@ using namespace protocol;
 struct server::implementation : boost::noncopyable\r
 {\r
        protocol::asio::io_service_manager                      io_service_manager_;\r
-       core::monitor::subject                                          monitor_subject_;\r
+       safe_ptr<core::monitor::subject>                        monitor_subject_;\r
        boost::promise<bool>&                                           shutdown_server_now_;\r
        safe_ptr<ogl_device>                                            ogl_;\r
        std::vector<safe_ptr<IO::AsyncEventServer>> async_servers_;     \r
@@ -90,7 +90,7 @@ struct server::implementation : boost::noncopyable
        implementation(boost::promise<bool>& shutdown_server_now)\r
                : shutdown_server_now_(shutdown_server_now)\r
                , ogl_(ogl_device::create())\r
-               , osc_client_(io_service_manager_.service(), monitor_subject_)\r
+               , osc_client_(io_service_manager_.service())\r
        {\r
                setup_audio(env::properties());\r
 \r
@@ -170,7 +170,7 @@ struct server::implementation : boost::noncopyable
                        \r
                        channels_.push_back(make_safe<video_channel>(channels_.size()+1, format_desc, ogl_, audio_channel_layout));\r
                        \r
-                       channels_.back()->monitor_output().link_target(&monitor_subject_);\r
+                       channels_.back()->monitor_output().attach_parent(monitor_subject_);\r
                        channels_.back()->mixer()->set_straight_alpha_output(\r
                                        xml_channel.second.get(L"straight-alpha-output", false));\r
 \r
@@ -269,6 +269,8 @@ struct server::implementation : boost::noncopyable
        {               \r
                using boost::property_tree::wptree;\r
                using namespace boost::asio::ip;\r
+\r
+               monitor_subject_->attach_parent(osc_client_.sink());\r
                \r
                auto default_port =\r
                                pt.get<unsigned short>(L"configuration.osc.default-port", 6250);\r
@@ -357,9 +359,9 @@ std::shared_ptr<thumbnail_generator> server::get_thumbnail_generator() const
        return impl_->thumbnail_generator_;\r
 }\r
 \r
-core::monitor::source& server::monitor_output()\r
+core::monitor::subject& server::monitor_output()\r
 {\r
-       return impl_->monitor_subject_;\r
+       return *impl_->monitor_subject_;\r
 }\r
 \r
 }
\ No newline at end of file
index 571f9e330352a9338575bac874f3ad38871827d2..d4d6cd69fb1934441346852685a0ea6ff56c9486 100644 (file)
@@ -45,7 +45,7 @@ public:
        const std::vector<safe_ptr<core::video_channel>> get_channels() const;\r
        std::shared_ptr<core::thumbnail_generator> get_thumbnail_generator() const;\r
 \r
-       core::monitor::source& monitor_output();\r
+       core::monitor::subject& monitor_output();\r
 \r
 private:\r
        struct implementation;\r