]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 22 Oct 2011 19:03:17 +0000 (19:03 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 22 Oct 2011 19:03:17 +0000 (19:03 +0000)
31 files changed:
common/common.vcxproj
common/common.vcxproj.filters
common/concurrency/message.h [new file with mode: 0644]
core/consumer/output.cpp
core/consumer/output.h
core/core.vcxproj
core/core.vcxproj.filters
core/mixer/image/image_mixer.cpp
core/mixer/mixer.cpp
core/mixer/mixer.h
core/producer/stage.cpp
core/producer/stage.h
core/video_channel.cpp
core/video_channel.h
core/video_channel_context.cpp [deleted file]
core/video_channel_context.h [deleted file]
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/input.h
modules/ffmpeg/producer/util.cpp
modules/ffmpeg/producer/util.h
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h
modules/ogl/consumer/ogl_consumer.cpp
protocol/amcp/AMCPCommandsImpl.cpp
shell/casparcg.config

index 5c7e77f37d2c37abcc7631c09a66f2c94a7beabe..db2b9a39a81722d94eb99246aa722e632e9fc309 100644 (file)
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
     <ClInclude Include="concurrency\com_context.h" />\r
     <ClInclude Include="concurrency\executor.h" />\r
+    <ClInclude Include="concurrency\message.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
     <ClInclude Include="exception\exceptions.h" />\r
     <ClInclude Include="exception\win32_exception.h" />\r
index 9efe3db954cd7d0370447458a2ab49e63da0a8ba..505a3e1ae6f806b79023be930d23d37ecca5401d 100644 (file)
     <ClInclude Include="utility\move_on_copy.h">\r
       <Filter>source\utility</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="concurrency\message.h">\r
+      <Filter>source\concurrency</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
diff --git a/common/concurrency/message.h b/common/concurrency/message.h
new file mode 100644 (file)
index 0000000..8133b2e
--- /dev/null
@@ -0,0 +1,57 @@
+#pragma once\r
+\r
+#include "../memory/safe_ptr.h"\r
+\r
+#include <agents.h>\r
+#include <semaphore.h>\r
+\r
+#include <boost/noncopyable.hpp>\r
+\r
+namespace caspar {\r
+       \r
+class token : boost::noncopyable\r
+{\r
+       std::shared_ptr<Concurrency::semaphore> semaphore_;\r
+public:\r
+\r
+       token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
+               : semaphore_(semaphore)\r
+       {\r
+               semaphore_->acquire();\r
+       }\r
+\r
+       token(token&& other)\r
+               : semaphore_(std::move(other.semaphore_))\r
+       {\r
+       }\r
+\r
+       ~token()\r
+       {\r
+               if(semaphore_)\r
+                       semaphore_->release();\r
+       }\r
+};\r
+\r
+template<typename T>\r
+class message\r
+{\r
+       T payload_;\r
+       token token_;\r
+public:\r
+       message(const T& payload, token&& token)\r
+               : payload_(payload)\r
+               , token_(std::move(token))\r
+       {\r
+       }\r
+       \r
+       template<typename U>\r
+       safe_ptr<message<U>> transfer(const U& payload)\r
+       {\r
+               return make_safe<message<U>>(payload, std::move(token_));\r
+       }\r
+\r
+       T& value() {return payload_;}\r
+       const T& value() const {return payload_;}\r
+};\r
+\r
+}
\ No newline at end of file
index 426013cac5e63790167c084bbfa36fac45b788ce..f3395c56ae576048dbc2e881b64052505c7c9b6d 100644 (file)
@@ -26,8 +26,6 @@
 \r
 #include "output.h"\r
 \r
-#include "../video_channel_context.h"\r
-\r
 #include "../video_format.h"\r
 #include "../mixer/gpu/ogl_device.h"\r
 #include "../mixer/read_frame.h"\r
 \r
 #include <tbb/mutex.h>\r
 \r
+#include <concrt_extras.h>\r
+\r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace core {\r
+       \r
+struct destruction_context\r
+{\r
+       std::shared_ptr<frame_consumer> consumer;\r
+       Concurrency::event                              event;\r
+\r
+       destruction_context(std::shared_ptr<frame_consumer>&& consumer) \r
+               : consumer(consumer)\r
+       {\r
+       }\r
+};\r
+\r
+void __cdecl destroy_consumer(LPVOID lpParam)\r
+{\r
+       auto destruction = std::unique_ptr<destruction_context>(static_cast<destruction_context*>(lpParam));\r
+       \r
+       try\r
+       {               \r
+               if(destruction->consumer.unique())\r
+               {\r
+                       Concurrency::scoped_oversubcription_token oversubscribe;\r
+                       destruction->consumer.reset();\r
+               }\r
+               else\r
+                       CASPAR_LOG(warning) << destruction->consumer->print() << " Not destroyed asynchronously.";              \r
+       }\r
+       catch(...)\r
+       {\r
+               CASPAR_LOG_CURRENT_EXCEPTION();\r
+       }\r
+       \r
+       destruction->event.set();\r
+}\r
+\r
+void __cdecl destroy_and_wait_consumer(LPVOID lpParam)\r
+{\r
+       try\r
+       {\r
+               auto destruction = static_cast<destruction_context*>(lpParam);\r
+               Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, lpParam);\r
+               if(destruction->event.wait(1000) == Concurrency::COOPERATIVE_WAIT_TIMEOUT)\r
+                       CASPAR_LOG(warning) << " Potential destruction deadlock detected. Might leak resources.";\r
+       }\r
+       catch(...)\r
+       {\r
+               CASPAR_LOG_CURRENT_EXCEPTION();\r
+       }\r
+}\r
 \r
 struct output::implementation\r
 {      \r
        typedef std::pair<safe_ptr<read_frame>, safe_ptr<read_frame>> fill_and_key;\r
        \r
-       video_channel_context&          channel_;\r
-       const std::function<void()> restart_channel_;\r
+       const video_format_desc format_desc_;\r
 \r
        std::map<int, safe_ptr<frame_consumer>> consumers_;\r
        typedef std::map<int, safe_ptr<frame_consumer>>::value_type layer_t;\r
        \r
        high_prec_timer timer_;\r
+\r
+       critical_section                                                                mutex_;\r
+       std::shared_ptr<Scheduler>                                              scheduler_;\r
+       call<safe_ptr<message<safe_ptr<read_frame>>>>   output_;\r
                \r
 public:\r
-       implementation(video_channel_context& video_channel, const std::function<void()>& restart_channel) \r
-               : channel_(video_channel)\r
-               , restart_channel_(restart_channel)\r
+       implementation(output::source_t& source, const video_format_desc& format_desc) \r
+               : format_desc_(format_desc)\r
+               //, scheduler_(Scheduler::Create(SchedulerPolicy(1, ContextPriority, THREAD_PRIORITY_ABOVE_NORMAL)), [](Scheduler* p){p->Release();})\r
+               , output_(std::bind(&implementation::execute, this, std::placeholders::_1))\r
        {\r
+               source.link_target(&output_);\r
        }       \r
        \r
        void add(int index, safe_ptr<frame_consumer>&& consumer)\r
        {               \r
-               channel_.execution().invoke([&]\r
                {\r
+                       critical_section::scoped_lock lock(mutex_);\r
                        consumers_.erase(index);\r
-               });\r
-\r
-               consumer->initialize(channel_.get_format_desc());\r
+               }\r
 \r
-               channel_.execution().invoke([&]\r
+               consumer->initialize(format_desc_);\r
+               \r
                {\r
+                       critical_section::scoped_lock lock(mutex_);\r
                        consumers_.insert(std::make_pair(index, consumer));\r
 \r
                        CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";\r
-               });\r
+               }\r
        }\r
 \r
        void remove(int index)\r
        {\r
-               channel_.execution().invoke([&]\r
+               critical_section::scoped_lock lock(mutex_);\r
+               auto it = consumers_.find(index);\r
+               if(it != consumers_.end())\r
                {\r
-                       auto it = consumers_.find(index);\r
-                       if(it != consumers_.end())\r
-                       {\r
-                               CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
-                               consumers_.erase(it);\r
-                       }\r
-               });\r
+                       CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
+                       consumers_.erase(it);\r
+               }\r
        }\r
                                                \r
-       void execute(const safe_ptr<read_frame>& frame)\r
-       {                       \r
-               if(!has_synchronization_clock())\r
-                       timer_.tick(1.0/channel_.get_format_desc().fps);\r
+       void execute(const safe_ptr<message<safe_ptr<read_frame>>>& msg)\r
+       {       \r
+               auto frame = msg->value();\r
 \r
-               if(frame->image_size() != channel_.get_format_desc().size)\r
-               {\r
-                       timer_.tick(1.0/channel_.get_format_desc().fps);\r
-                       return;\r
+               critical_section::scoped_lock lock(mutex_);             \r
+\r
+               if(!has_synchronization_clock() || frame->image_size() != format_desc_.size)\r
+               {               \r
+                       scoped_oversubcription_token oversubscribe;\r
+                       timer_.tick(1.0/format_desc_.fps);\r
                }\r
                \r
-               auto it = consumers_.begin();\r
-               while(it != consumers_.end())\r
-               {\r
-                       auto consumer = it->second;\r
-\r
-                       if(consumer->get_video_format_desc() != channel_.get_format_desc())\r
-                               consumer->initialize(channel_.get_format_desc());\r
-\r
+               std::vector<int> removables;            \r
+               Concurrency::parallel_for_each(consumers_.begin(), consumers_.end(), [&](const decltype(*consumers_.begin())& pair)\r
+               {               \r
                        try\r
                        {\r
-                               if(consumer->send(frame))\r
-                                       ++it;\r
-                               else\r
-                                       consumers_.erase(it++);\r
+                               scoped_oversubcription_token oversubscribe;\r
+                               if(!pair.second->send(frame))\r
+                                       removables.push_back(pair.first);\r
                        }\r
                        catch(...)\r
-                       {\r
+                       {               \r
                                CASPAR_LOG_CURRENT_EXCEPTION();\r
-                               CASPAR_LOG(warning) << "Trying to restart consumer: " << consumer->print() << L".";\r
+                               CASPAR_LOG(error) << "Consumer error. Trying to recover:" << pair.second->print();\r
                                try\r
                                {\r
-                                       consumer->initialize(channel_.get_format_desc());\r
-                                       consumer->send(frame);\r
+                                       pair.second->initialize(format_desc_);\r
+                                       pair.second->send(frame);\r
                                }\r
                                catch(...)\r
-                               {       \r
-                                       CASPAR_LOG_CURRENT_EXCEPTION(); \r
-                                       CASPAR_LOG(warning) << "Consumer restart failed, trying to restart channel: " << consumer->print() << L".";     \r
-\r
-                                       try\r
-                                       {\r
-                                               restart_channel_();\r
-                                               consumer->initialize(channel_.get_format_desc());\r
-                                               consumer->send(frame);\r
-                                       }\r
-                                       catch(...)\r
-                                       {\r
-                                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                                               CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
-                                               consumers_.erase(it++);\r
-                                       }\r
+                               {\r
+                                       removables.push_back(pair.first);                               \r
+                                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                       CASPAR_LOG(error) << "Failed to recover consumer: " << pair.second->print() << L". Removing it.";\r
                                }\r
                        }\r
+               });\r
+\r
+               BOOST_FOREACH(auto& removable, removables)\r
+               {\r
+                       std::shared_ptr<frame_consumer> consumer = consumers_.find(removable)->second;\r
+                       consumers_.erase(removable);                    \r
+                       Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, new destruction_context(std::move(consumer)));\r
                }\r
        }\r
 \r
@@ -163,8 +206,7 @@ private:
        }\r
 };\r
 \r
-output::output(video_channel_context& video_channel, const std::function<void()>& restart_channel) : impl_(new implementation(video_channel, restart_channel)){}\r
+output::output(output::source_t& source, const video_format_desc& format_desc) : impl_(new implementation(source, format_desc)){}\r
 void output::add(int index, safe_ptr<frame_consumer>&& consumer){impl_->add(index, std::move(consumer));}\r
 void output::remove(int index){impl_->remove(index);}\r
-void output::execute(const safe_ptr<read_frame>& frame) {impl_->execute(frame); }\r
 }}
\ No newline at end of file
index 069a864beb4a3177244d323f72963d70677a0d7e..a05407ad291d882291ee8480fbc2f5d545e7f2ea 100644 (file)
 \r
 #include "../consumer/frame_consumer.h"\r
 \r
+#include <common/concurrency/message.h>\r
 #include <common/memory/safe_ptr.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
+#include <agents.h>\r
+\r
 namespace caspar { namespace core {\r
        \r
 class video_channel_context;\r
@@ -32,12 +35,12 @@ class video_channel_context;
 class output : boost::noncopyable\r
 {\r
 public:\r
-       explicit output(video_channel_context& video_channel, const std::function<void()>& restart_channel);\r
+       typedef Concurrency::ISource<safe_ptr<message<safe_ptr<read_frame>>>> source_t;\r
+\r
+       explicit output(source_t& source, const video_format_desc& format_desc);\r
 \r
        void add(int index, safe_ptr<frame_consumer>&& consumer);\r
        void remove(int index);\r
-\r
-       void execute(const safe_ptr<read_frame>& frame); // nothrow\r
 private:\r
        struct implementation;\r
        safe_ptr<implementation> impl_;\r
index 5481610a77ae7f5a6cef9689621b67224cddc5fb..c06a4c8c54a79d9b58f003d9209f364e44a3e329 100644 (file)
     <ClInclude Include="mixer\image\blend_modes.h" />\r
     <ClInclude Include="mixer\image\image_shader.h" />\r
     <ClInclude Include="video_channel.h" />\r
-    <ClInclude Include="video_channel_context.h" />\r
     <ClInclude Include="consumer\output.h" />\r
     <ClInclude Include="consumer\frame_consumer.h" />\r
     <ClInclude Include="mixer\audio\audio_mixer.h" />\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">Create</PrecompiledHeader>\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">Create</PrecompiledHeader>\r
     </ClCompile>\r
-    <ClCompile Include="video_channel_context.cpp" />\r
     <ClCompile Include="video_format.cpp">\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">StdAfx.h</PrecompiledHeaderFile>\r
index 83f07372ce192f365dd244464e57fd9f3efc94d1..b9995b672e2c30862ef91965dd3586a805d2a0b0 100644 (file)
     <ClInclude Include="video_channel.h">\r
       <Filter>source</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="video_channel_context.h">\r
-      <Filter>source</Filter>\r
-    </ClInclude>\r
     <ClInclude Include="mixer\image\blending_glsl.h">\r
       <Filter>source\mixer\image</Filter>\r
     </ClInclude>\r
     <ClCompile Include="video_channel.cpp">\r
       <Filter>source</Filter>\r
     </ClCompile>\r
-    <ClCompile Include="video_channel_context.cpp">\r
-      <Filter>source</Filter>\r
-    </ClCompile>\r
     <ClCompile Include="video_format.cpp">\r
       <Filter>source</Filter>\r
     </ClCompile>\r
index 03280490f21a8676f2f3fe55b5df903afef8cea9..361d062055b9c9706d14eccccf23b9d2fb0ee22a 100644 (file)
@@ -26,7 +26,6 @@
 #include "../gpu/ogl_device.h"\r
 #include "../gpu/host_buffer.h"\r
 #include "../gpu/device_buffer.h"\r
-#include "../../video_channel_context.h"\r
 \r
 #include <common/exception/exceptions.h>\r
 #include <common/gl/gl_check.h>\r
index a0dec39471a1caed63400dc0c1685fca4624ec3f..e3b9d1e52a1eb336c4fe6bec24d2546523fc63bf 100644 (file)
@@ -27,8 +27,6 @@
 #include "audio/audio_mixer.h"\r
 #include "image/image_mixer.h"\r
 \r
-#include "../video_channel_context.h"\r
-\r
 #include <common/exception/exceptions.h>\r
 #include <common/concurrency/executor.h>\r
 #include <common/utility/tweener.h>\r
@@ -50,6 +48,8 @@
 \r
 #include <unordered_map>\r
 \r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace core {\r
                \r
 template<typename T>\r
@@ -86,7 +86,12 @@ public:
 \r
 struct mixer::implementation : boost::noncopyable\r
 {              \r
-       video_channel_context& channel_;\r
+       critical_section                        mutex_;\r
+       Concurrency::transformer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>, \r
+                                                        safe_ptr<message<safe_ptr<core::read_frame>>>> mixer_;\r
+\r
+       const video_format_desc format_desc_;\r
+       ogl_device&                             ogl_;\r
        \r
        audio_mixer     audio_mixer_;\r
        image_mixer image_mixer_;\r
@@ -96,63 +101,68 @@ struct mixer::implementation : boost::noncopyable
 \r
        std::queue<std::pair<boost::unique_future<safe_ptr<host_buffer>>, core::audio_buffer>> buffer_;\r
        \r
-       const size_t buffer_size_;\r
 \r
 public:\r
-       implementation(video_channel_context& video_channel) \r
-               : channel_(video_channel)\r
-               , audio_mixer_(channel_.get_format_desc())\r
-               , image_mixer_(channel_.ogl(), channel_.get_format_desc())\r
-               , buffer_size_(env::properties().get("configuration.producers.buffer-depth", 1))\r
+       implementation(mixer::source_t& source, mixer::target_t& target, const video_format_desc& format_desc, ogl_device& ogl) \r
+               : format_desc_(format_desc)\r
+               , ogl_(ogl)\r
+               , audio_mixer_(format_desc)\r
+               , image_mixer_(ogl, format_desc)\r
+               , mixer_(std::bind(&implementation::mix, this, std::placeholders::_1), &target)\r
        {       \r
-               CASPAR_LOG(info) << print() << L" Successfully initialized . Buffer-depth: " << buffer_size_;   \r
+               CASPAR_LOG(info) << print() << L" Successfully initialized.";   \r
+               source.link_target(&mixer_);\r
        }\r
-                       \r
-       safe_ptr<read_frame> execute(const std::map<int, safe_ptr<core::basic_frame>>& frames)\r
-       {                       \r
-               try\r
+               \r
+       safe_ptr<message<safe_ptr<core::read_frame>>> mix(const safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>& msg)\r
+       {               \r
+               auto frames = msg->value();\r
+\r
+               critical_section::scoped_lock lock(mutex_);\r
+\r
+               BOOST_FOREACH(auto& frame, frames)\r
                {\r
-                       BOOST_FOREACH(auto& frame, frames)\r
-                       {\r
-                               auto blend_it = blend_modes_.find(frame.first);\r
-                               image_mixer_.begin_layer(blend_it != blend_modes_.end() ? blend_it->second : blend_mode::normal);\r
+                       auto blend_it = blend_modes_.find(frame.first);\r
+                       image_mixer_.begin_layer(blend_it != blend_modes_.end() ? blend_it->second : blend_mode::normal);\r
                                \r
-                               auto frame1 = make_safe<core::basic_frame>(frame.second);\r
-                               frame1->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
-\r
-                               if(channel_.get_format_desc().field_mode != core::field_mode::progressive)\r
-                               {                               \r
-                                       auto frame2 = make_safe<core::basic_frame>(frame.second);\r
-                                       frame2->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
-                                       frame1 = core::basic_frame::interlace(frame1, frame2, channel_.get_format_desc().field_mode);\r
-                               }\r
+                       auto frame1 = make_safe<core::basic_frame>(frame.second);\r
+                       frame1->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
+\r
+                       if(format_desc_.field_mode != core::field_mode::progressive)\r
+                       {                               \r
+                               auto frame2 = make_safe<core::basic_frame>(frame.second);\r
+                               frame2->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
+                               frame1 = core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode);\r
+                       }\r
                                                                        \r
-                               frame1->accept(audio_mixer_);                                   \r
-                               frame1->accept(image_mixer_);\r
+                       frame1->accept(audio_mixer_);                                   \r
+                       frame1->accept(image_mixer_);\r
 \r
-                               image_mixer_.end_layer();\r
-                       }\r
+                       image_mixer_.end_layer();\r
+               }\r
 \r
-                       auto image = image_mixer_.render();\r
-                       auto audio = audio_mixer_.mix();\r
+               auto image = image_mixer_.render();\r
+               auto audio = audio_mixer_.mix();\r
                        \r
-                       buffer_.push(std::make_pair(std::move(image), audio));\r
+               buffer_.push(std::make_pair(std::move(image), audio));\r
 \r
-                       if(buffer_.size()-1 < buffer_size_)                     \r
-                               return make_safe<read_frame>();\r
-               \r
-                       auto res = std::move(buffer_.front());\r
-                       buffer_.pop();\r
+               if(buffer_.size() < 2)\r
+                       return msg->transfer(make_safe<core::read_frame>());    \r
 \r
-                       return make_safe<read_frame>(channel_.ogl(), channel_.get_format_desc().size, std::move(res.first.get()), std::move(res.second));       \r
-               }\r
-               catch(...)\r
+               auto res = std::move(buffer_.front());\r
+               buffer_.pop();\r
+\r
+               auto buffer = [&]() -> safe_ptr<core::host_buffer>\r
                {\r
-                       CASPAR_LOG(error) << L"[mixer] Error detected.";\r
-                       throw;\r
-               }                               \r
+                       scoped_oversubcription_token oversubscribe;\r
+                       return std::move(res.first.get());\r
+               }();\r
+\r
+               auto frame = make_safe<read_frame>(ogl_, format_desc_.size, std::move(buffer), std::move(res.second));\r
+\r
+               return msg->transfer<safe_ptr<core::read_frame>>(std::move(frame));     \r
        }\r
-                                       \r
+                                               \r
        safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc)\r
        {               \r
                return image_mixer_.create_frame(tag, desc);\r
@@ -165,39 +175,35 @@ public:
                \r
        void set_transform(int index, const frame_transform& transform, unsigned int mix_duration, const std::wstring& tween)\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       auto src = transforms_[index].fetch();\r
-                       auto dst = transform;\r
-                       transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+\r
+               auto src = transforms_[index].fetch();\r
+               auto dst = transform;\r
+               transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
        }\r
                                \r
        void apply_transform(int index, const std::function<frame_transform(frame_transform)>& transform, unsigned int mix_duration, const std::wstring& tween)\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       auto src = transforms_[index].fetch();\r
-                       auto dst = transform(src);\r
-                       transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+\r
+               auto src = transforms_[index].fetch();\r
+               auto dst = transform(src);\r
+               transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
        }\r
 \r
        void clear_transforms()\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       transforms_.clear();\r
-                       blend_modes_.clear();\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+\r
+               transforms_.clear();\r
+               blend_modes_.clear();\r
        }\r
                \r
        void set_blend_mode(int index, blend_mode::type value)\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       blend_modes_[index] = value;\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+\r
+               blend_modes_[index] = value;\r
        }\r
 \r
        std::wstring print() const\r
@@ -206,9 +212,8 @@ public:
        }\r
 };\r
        \r
-mixer::mixer(video_channel_context& video_channel) : impl_(new implementation(video_channel)){}\r
-safe_ptr<core::read_frame> mixer::execute(const std::map<int, safe_ptr<core::basic_frame>>& frames){ return impl_->execute(frames);}\r
-core::video_format_desc mixer::get_video_format_desc() const { return impl_->channel_.get_format_desc(); }\r
+mixer::mixer(mixer::source_t& source, mixer::target_t& target, const video_format_desc& format_desc, ogl_device& ogl) : impl_(new implementation(source, target, format_desc, ogl)){}\r
+core::video_format_desc mixer::get_video_format_desc() const { return impl_->format_desc_; }\r
 safe_ptr<core::write_frame> mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); }                \r
 boost::unique_future<safe_ptr<write_frame>> mixer::create_frame2(const void* video_stream_tag, const pixel_format_desc& desc){ return impl_->create_frame2(video_stream_tag, desc); }                  \r
 void mixer::set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration, const std::wstring& tween){impl_->set_transform(index, transform, mix_duration, tween);}\r
index a2948c02519b22780d462f9fbe98dcf3a24827c7..5903e2a4d790bd60a137732749c4c5306cfa4013 100644 (file)
 \r
 #include "../producer/frame/frame_factory.h"\r
 \r
+#include <common/concurrency/message.h>\r
 #include <common/memory/safe_ptr.h>\r
 \r
+#include <agents.h>\r
+\r
 #include <map>\r
 \r
 namespace caspar { \r
@@ -40,21 +43,22 @@ struct frame_transform;
 struct frame_transform;\r
 class video_channel_context;;\r
 struct pixel_format;\r
+class ogl_device;\r
 \r
 class mixer : public core::frame_factory\r
 {\r
 public:        \r
+       \r
+       typedef Concurrency::ISource<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>>   source_t;\r
+       typedef Concurrency::ITarget<safe_ptr<message<safe_ptr<core::read_frame>>>>                             target_t;\r
 \r
-       explicit mixer(video_channel_context& video_channel);\r
-               \r
-       safe_ptr<core::read_frame> execute(const std::map<int, safe_ptr<core::basic_frame>>& frames); // nothrow\r
-               \r
+       explicit mixer(source_t& source, target_t& target, const video_format_desc& format_desc, ogl_device& ogl);\r
+                               \r
        safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc);                 \r
        boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc);\r
 \r
        core::video_format_desc get_video_format_desc() const; // nothrow\r
-\r
-\r
+       \r
        void set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration = 0, const std::wstring& tween = L"linear");\r
        void apply_frame_transform(int index, const std::function<core::frame_transform(core::frame_transform)>& transform, unsigned int mix_duration = 0, const std::wstring& tween = L"linear");\r
        void clear_transforms();\r
index db82c2ce96181b3fbc8690e7f8f09afdee719ba8..0c015744c7717dc41f6b7a6d4c690cedbac7e1ef 100644 (file)
@@ -24,8 +24,6 @@
 \r
 #include "layer.h"\r
 \r
-#include "../video_channel_context.h"\r
-\r
 #include <core/producer/frame/basic_frame.h>\r
 #include <core/producer/frame/frame_factory.h>\r
 \r
 \r
 #include <boost/foreach.hpp>\r
 \r
+#include <agents.h>\r
 #include <ppl.h>\r
 \r
 #include <map>\r
 #include <set>\r
 \r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace core {\r
 \r
-struct stage::implementation : boost::noncopyable\r
+struct stage::implementation : public agent, public boost::noncopyable\r
 {              \r
+       overwrite_buffer<bool>                                          is_running_;\r
+       Concurrency::critical_section                           mutex_;\r
+       stage::target_t&                                                        target_;\r
        std::map<int, layer>                                            layers_;        \r
-       video_channel_context&                                          channel_;\r
+       safe_ptr<semaphore>                                                     semaphore_;\r
 public:\r
-       implementation(video_channel_context& video_channel)  \r
-               : channel_(video_channel)\r
+       implementation(stage::target_t& target)  \r
+               : target_(target)\r
+               , semaphore_(make_safe<semaphore>(3))\r
        {\r
+               start();\r
        }\r
-                                               \r
-       std::map<int, safe_ptr<basic_frame>> execute()\r
-       {               \r
-               try\r
+\r
+       ~implementation()\r
+       {\r
+               send(is_running_, false);\r
+               semaphore_->release();\r
+               agent::wait(this);\r
+       }\r
+\r
+       virtual void run()\r
+       {\r
+               send(is_running_, true);\r
+               while(is_running_.value())\r
                {\r
-                       std::map<int, safe_ptr<basic_frame>> frames;\r
+                       try\r
+                       {\r
+                               std::map<int, safe_ptr<basic_frame>> frames;\r
+                               {\r
+                                       critical_section::scoped_lock lock(mutex_);\r
                \r
-                       BOOST_FOREACH(auto& layer, layers_)                     \r
-                               frames[layer.first] = basic_frame::empty();     \r
-\r
-                       Concurrency::parallel_for_each(layers_.begin(), layers_.end(), [&](std::map<int, layer>::value_type& layer) \r
+                                       BOOST_FOREACH(auto& layer, layers_)                     \r
+                                               frames[layer.first] = basic_frame::empty();     \r
+\r
+                                       Concurrency::parallel_for_each(layers_.begin(), layers_.end(), [&](std::map<int, layer>::value_type& layer) \r
+                                       {\r
+                                               frames[layer.first] = layer.second.receive();   \r
+                                       });\r
+                               }\r
+\r
+                               send(target_, make_safe<message<decltype(frames)>>(frames, token(semaphore_)));\r
+                       }\r
+                       catch(...)\r
                        {\r
-                               frames[layer.first] = layer.second.receive();   \r
-                       });\r
-\r
-                       return frames;\r
+                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       }       \r
                }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG(error) << L"[stage] Error detected";\r
-                       throw;\r
-               }               \r
-       }\r
 \r
+               send(is_running_, false);\r
+               done();\r
+       }\r
+                               \r
        void load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta)\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       layers_[index].load(create_destroy_producer_proxy(producer), preview, auto_play_delta);\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               layers_[index].load(create_destroy_producer_proxy(producer), preview, auto_play_delta);\r
        }\r
 \r
        void pause(int index)\r
        {               \r
-               channel_.execution().invoke([&]\r
-               {\r
-                       layers_[index].pause();\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               layers_[index].pause();\r
        }\r
 \r
        void play(int index)\r
        {               \r
-               channel_.execution().invoke([&]\r
-               {\r
-                       layers_[index].play();\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               layers_[index].play();\r
        }\r
 \r
        void stop(int index)\r
        {               \r
-               channel_.execution().invoke([&]\r
-               {\r
-                       layers_[index].stop();\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               layers_[index].stop();\r
        }\r
 \r
        void clear(int index)\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       layers_.erase(index);\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               layers_.erase(index);\r
        }\r
                \r
        void clear()\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       layers_.clear();\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               layers_.clear();\r
        }       \r
        \r
        void swap_layer(int index, size_t other_index)\r
        {\r
-               channel_.execution().invoke([&]\r
-               {\r
-                       std::swap(layers_[index], layers_[other_index]);\r
-               }, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               std::swap(layers_[index], layers_[other_index]);\r
        }\r
 \r
        void swap_layer(int index, size_t other_index, stage& other)\r
@@ -135,12 +143,10 @@ public:
                if(other.impl_.get() == this)\r
                        swap_layer(index, other_index);\r
                else\r
-               {\r
-                       auto func = [&]\r
-                       {\r
-                               std::swap(layers_[index], other.impl_->layers_[other_index]);\r
-                       };              \r
-                       channel_.execution().invoke([&]{other.impl_->channel_.execution().invoke(func, high_priority);}, high_priority);\r
+               {                       \r
+                       critical_section::scoped_lock lock1(mutex_);\r
+                       critical_section::scoped_lock lock2(other.impl_->mutex_);\r
+                       std::swap(layers_[index], other.impl_->layers_[other_index]);\r
                }\r
        }\r
 \r
@@ -149,39 +155,38 @@ public:
                if(other.impl_.get() == this)\r
                        return;\r
                \r
-               auto func = [&]\r
-               {\r
-                       std::swap(layers_, other.impl_->layers_);\r
-               };              \r
-               channel_.execution().invoke([&]{other.impl_->channel_.execution().invoke(func, high_priority);}, high_priority);\r
+               critical_section::scoped_lock lock1(mutex_);\r
+               critical_section::scoped_lock lock2(other.impl_->mutex_);\r
+               std::swap(layers_, other.impl_->layers_);\r
        }\r
 \r
        layer_status get_status(int index)\r
        {               \r
-               return channel_.execution().invoke([&]\r
-               {\r
-                       return layers_[index].status();\r
-               }, high_priority );\r
+               critical_section::scoped_lock lock(mutex_);\r
+               return layers_[index].status();\r
        }\r
        \r
        safe_ptr<frame_producer> foreground(int index)\r
        {\r
-               return channel_.execution().invoke([=]{return layers_[index].foreground();}, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               return layers_[index].foreground();\r
        }\r
        \r
        safe_ptr<frame_producer> background(int index)\r
        {\r
-               return channel_.execution().invoke([=]{return layers_[index].background();}, high_priority);\r
+               critical_section::scoped_lock lock(mutex_);\r
+               return layers_[index].background();\r
        }\r
 \r
        std::wstring print() const\r
        {\r
-               return L"stage [" + boost::lexical_cast<std::wstring>(channel_.index()) + L"]";\r
+               // C-TODO\r
+               return L"stage []";// + boost::lexical_cast<std::wstring>(channel_.index()) + L"]";\r
        }\r
 \r
 };\r
 \r
-stage::stage(video_channel_context& video_channel) : impl_(new implementation(video_channel)){}\r
+stage::stage(target_t& target) : impl_(new implementation(target)){}\r
 void stage::swap(stage& other){impl_->swap(other);}\r
 void stage::load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta){impl_->load(index, producer, preview, auto_play_delta);}\r
 void stage::pause(int index){impl_->pause(index);}\r
@@ -194,5 +199,4 @@ void stage::swap_layer(int index, size_t other_index, stage& other){impl_->swap_
 layer_status stage::get_status(int index){return impl_->get_status(index);}\r
 safe_ptr<frame_producer> stage::foreground(size_t index) {return impl_->foreground(index);}\r
 safe_ptr<frame_producer> stage::background(size_t index) {return impl_->background(index);}\r
-std::map<int, safe_ptr<basic_frame>> stage::execute(){return impl_->execute();}\r
 }}
\ No newline at end of file
index 8fb21a818d5c0a2e693afa50b618cf88f03e4d31..33aea704f96691d949cebf7714d3af88e2988617 100644 (file)
 #include "frame_producer.h"\r
 \r
 #include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/message.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
+#include <agents.h>\r
+\r
 namespace caspar { namespace core {\r
 \r
 struct video_format_desc;\r
@@ -35,12 +38,12 @@ struct layer_status;
 class stage : boost::noncopyable\r
 {\r
 public:\r
-       explicit stage(video_channel_context& video_channel);\r
+       typedef Concurrency::ITarget<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> target_t;\r
 \r
-       void swap(stage& other);\r
+       explicit stage(target_t& target);\r
 \r
-       std::map<int, safe_ptr<basic_frame>> execute();\r
-               \r
+       void swap(stage& other);\r
+                       \r
        void load(int index, const safe_ptr<frame_producer>& producer, bool preview = false, int auto_play_delta = -1);\r
        void pause(int index);\r
        void play(int index);\r
index e48f857326ffdee91f9b19ec664b1f3c3815e0b1..cacfc4f3cab3e47576204a005d70f1c43434b659 100644 (file)
@@ -22,7 +22,6 @@
 \r
 #include "video_channel.h"\r
 \r
-#include "video_channel_context.h"\r
 #include "video_format.h"\r
 \r
 #include "consumer/output.h"\r
@@ -34,6 +33,8 @@
 \r
 #include "mixer/gpu/ogl_device.h"\r
 \r
+#include <agents_extras.h>\r
+\r
 #include <boost/timer.hpp>\r
 \r
 #ifdef _MSC_VER\r
@@ -44,107 +45,103 @@ namespace caspar { namespace core {
 \r
 struct video_channel::implementation : boost::noncopyable\r
 {\r
-       video_channel_context                   context_;\r
+       Concurrency::unbounded_buffer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>>  stage_frames_;\r
+       Concurrency::unbounded_buffer<safe_ptr<message<safe_ptr<read_frame>>>>                                  mixer_frames_;\r
+       \r
+       const video_format_desc format_desc_;\r
 \r
        safe_ptr<caspar::core::output>                  output_;\r
        std::shared_ptr<caspar::core::mixer>    mixer_;\r
        safe_ptr<caspar::core::stage>                   stage_;\r
 \r
-       safe_ptr<diagnostics::graph>    diag_;\r
+       safe_ptr<diagnostics::graph>    graph_;\r
        boost::timer                                    frame_timer_;\r
        boost::timer                                    tick_timer_;\r
        boost::timer                                    output_timer_;\r
        \r
 public:\r
        implementation(int index, const video_format_desc& format_desc, ogl_device& ogl)  \r
-               : context_(index, ogl, format_desc)\r
-               , diag_(diagnostics::create_graph(narrow(print())))\r
-               , output_(new caspar::core::output(context_, [this]{restart();}))\r
-               , mixer_(new caspar::core::mixer(context_))\r
-               , stage_(new caspar::core::stage(context_))     \r
+               : graph_(diagnostics::create_graph(narrow(print()), false))\r
+               , format_desc_(format_desc)\r
+               , output_(new caspar::core::output(mixer_frames_, format_desc))\r
+               , mixer_(new caspar::core::mixer(stage_frames_, mixer_frames_, format_desc, ogl))\r
+               , stage_(new caspar::core::stage(stage_frames_))        \r
        {\r
-               diag_->add_guide("produce-time", 0.5f); \r
-               diag_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
-               diag_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));    \r
-               diag_->set_color("output-time", diagnostics::color(1.0f, 0.5f, 0.0f));\r
-               diag_->set_color("mix-time", diagnostics::color(1.0f, 1.0f, 0.9f));\r
+               graph_->add_guide("produce-time", 0.5f);        \r
+               graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
+               graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));   \r
+               graph_->set_color("output-time", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+               graph_->set_color("mix-time", diagnostics::color(1.0f, 1.0f, 0.9f));\r
+               graph_->start();\r
 \r
                CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
-               context_.execution().begin_invoke([this]{tick();});\r
-       }\r
-\r
-       ~implementation()\r
-       {\r
-               // Stop context before destroying devices.\r
-               context_.execution().stop();\r
-               context_.execution().join();\r
-       }\r
-\r
-       void tick()\r
-       {\r
-               try\r
-               {\r
-                       // Produce\r
-\r
-                       frame_timer_.restart();\r
-\r
-                       auto simple_frames = stage_->execute();\r
-\r
-                       diag_->update_value("produce-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
-               \r
-                       // Mix\r
-\r
-                       frame_timer_.restart();\r
-\r
-                       auto finished_frame = mixer_->execute(simple_frames);\r
-               \r
-                       diag_->update_value("mix-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
-               \r
-                       // Consume\r
-               \r
-                       output_timer_.restart();\r
-\r
-                       output_->execute(finished_frame);\r
-               \r
-                       diag_->update_value("output-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
-\r
-               \r
-                       diag_->update_value("tick-time", tick_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
-                       tick_timer_.restart();\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       CASPAR_LOG(error) << context_.print() << L" Unexpected exception. Clearing stage and freeing memory";\r
-                       restart();\r
-               }\r
-\r
-               context_.execution().begin_invoke([this]{tick();});\r
-       }\r
-\r
-       void restart()\r
-       {\r
-               stage_->clear();\r
-               context_.ogl().gc().wait();\r
-\r
-               mixer_ = nullptr;\r
-               mixer_.reset(new caspar::core::mixer(context_));\r
        }\r
+       \r
+       //void tick()\r
+       //{\r
+       //      try\r
+       //      {\r
+       //              // Produce\r
+\r
+       //              frame_timer_.restart();\r
+\r
+       //              auto simple_frames = stage_->execute();\r
+\r
+       //              graph_->update_value("produce-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+       //      \r
+       //              // Mix\r
+\r
+       //              frame_timer_.restart();\r
+\r
+       //              auto finished_frame = mixer_->execute(simple_frames);\r
+       //      \r
+       //              graph_->update_value("mix-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+       //      \r
+       //              // Consume\r
+       //      \r
+       //              output_timer_.restart();\r
+\r
+       //              output_->execute(finished_frame);\r
+       //      \r
+       //              graph_->update_value("output-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+\r
+       //      \r
+       //              graph_->update_value("tick-time", tick_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+       //              tick_timer_.restart();\r
+       //      }\r
+       //      catch(...)\r
+       //      {\r
+       //              CASPAR_LOG_CURRENT_EXCEPTION();\r
+       //              CASPAR_LOG(error) << context_.print() << L" Unexpected exception. Clearing stage and freeing memory";\r
+       //              restart();\r
+       //      }\r
+\r
+       //      context_.execution().begin_invoke([this]{tick();});\r
+       //}\r
+\r
+       //void restart()\r
+       //{\r
+       //      stage_->clear();\r
+       //      context_.ogl().gc().wait();\r
+\r
+       //      mixer_ = nullptr;\r
+       //      mixer_.reset(new caspar::core::mixer(context_));\r
+       //}\r
                \r
        std::wstring print() const\r
        {\r
-               return context_.print();\r
+               return L"video_channel";\r
        }\r
 \r
-       void set_video_format_desc(const video_format_desc& format_desc)\r
-       {\r
-               context_.execution().begin_invoke([=]\r
-               {\r
-                       stage_->clear();\r
-                       context_.ogl().gc().wait();\r
-                       context_.set_format_desc(format_desc);\r
-               });\r
-       }\r
+       //void set_video_format_desc(const video_format_desc& format_desc)\r
+       //{\r
+       //      context_.execution().begin_invoke([=]\r
+       //      {\r
+       //              stage_->clear();\r
+       //              context_.ogl().gc().wait();\r
+       //              context_.set_format_desc(format_desc);\r
+       //      });\r
+       //}\r
 };\r
 \r
 video_channel::video_channel(int index, const video_format_desc& format_desc, ogl_device& ogl) : impl_(new implementation(index, format_desc, ogl)){}\r
@@ -152,9 +149,7 @@ video_channel::video_channel(video_channel&& other) : impl_(std::move(other.impl
 safe_ptr<stage> video_channel::stage() { return impl_->stage_;} \r
 safe_ptr<mixer> video_channel::mixer() { return make_safe_ptr(impl_->mixer_);} \r
 safe_ptr<output> video_channel::output() { return impl_->output_;} \r
-video_format_desc video_channel::get_video_format_desc() const{return impl_->context_.get_format_desc();}\r
-void video_channel::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
+video_format_desc video_channel::get_video_format_desc() const{return impl_->format_desc_;}\r
 std::wstring video_channel::print() const { return impl_->print();}\r
-video_channel_context& video_channel::context(){return impl_->context_;}\r
 \r
 }}
\ No newline at end of file
index 7a0225b6b0395d99281ebd3a558b2564b5da654c..13b07c571baedc50022bcb82f2ba3a174fa2736e 100644 (file)
@@ -42,11 +42,10 @@ public:
        safe_ptr<stage> stage();\r
        safe_ptr<mixer> mixer();\r
        safe_ptr<output> output();\r
-\r
-       video_channel_context& context();\r
-\r
+       \r
+       // C_TODO\r
        video_format_desc get_video_format_desc() const;\r
-       void set_video_format_desc(const video_format_desc& format_desc);\r
+       //void set_video_format_desc(const video_format_desc& format_desc);\r
 \r
        std::wstring print() const;\r
 \r
diff --git a/core/video_channel_context.cpp b/core/video_channel_context.cpp
deleted file mode 100644 (file)
index d46e689..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-#include "stdAfx.h"\r
-\r
-#include "video_channel_context.h"\r
-\r
-namespace caspar { namespace core {\r
-\r
-struct video_channel_context::implementation\r
-{              \r
-       mutable tbb::spin_rw_mutex      mutex_;\r
-       const int                                       index_;\r
-       video_format_desc                       format_desc_;\r
-       executor                                        execution_;\r
-       ogl_device&                                     ogl_;\r
-\r
-       implementation(int index, ogl_device& ogl, const video_format_desc& format_desc)\r
-               : index_(index)\r
-               , format_desc_(format_desc)\r
-               , execution_(print() + L"/execution")\r
-               , ogl_(ogl)\r
-       {\r
-               execution_.set_priority_class(above_normal_priority_class);\r
-       }\r
-\r
-       std::wstring print() const\r
-       {\r
-               return L"video_channel[" + boost::lexical_cast<std::wstring>(index_+1) + L"|" +  format_desc_.name + L"]";\r
-       }\r
-};\r
-\r
-video_channel_context::video_channel_context(int index, ogl_device& ogl, const video_format_desc& format_desc) \r
-       : impl_(new implementation(index, ogl, format_desc))\r
-{\r
-}\r
-\r
-const int video_channel_context::index() const {return impl_->index_;}\r
-\r
-video_format_desc video_channel_context::get_format_desc()\r
-{\r
-       tbb::spin_rw_mutex::scoped_lock lock(impl_->mutex_, false);\r
-       return impl_->format_desc_;\r
-}\r
-\r
-void video_channel_context::set_format_desc(const video_format_desc& format_desc)\r
-{\r
-       tbb::spin_rw_mutex::scoped_lock lock(impl_->mutex_, true);\r
-       impl_->format_desc_ = format_desc;\r
-}\r
-\r
-executor& video_channel_context::execution()\r
-{\r
-       return impl_->execution_;\r
-}\r
-\r
-ogl_device& video_channel_context::ogl()\r
-{\r
-       return impl_->ogl_;\r
-}\r
-\r
-std::wstring video_channel_context::print() const\r
-{\r
-       return impl_->print();\r
-}\r
-\r
-}}
\ No newline at end of file
diff --git a/core/video_channel_context.h b/core/video_channel_context.h
deleted file mode 100644 (file)
index 9781295..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-#pragma once\r
-\r
-#include <common/concurrency/executor.h>\r
-\r
-#include <core/video_format.h>\r
-\r
-#include <tbb/spin_rw_mutex.h>\r
-\r
-#include <boost/noncopyable.hpp>\r
-#include <boost/lexical_cast.hpp>\r
-\r
-#include <string>\r
-\r
-namespace caspar { \r
-\r
-class executor;\r
-\r
-namespace core {\r
-\r
-class ogl_device;\r
-\r
-class video_channel_context\r
-{\r
-\r
-public:\r
-       video_channel_context(int index, ogl_device& ogl, const video_format_desc& format_desc);\r
-\r
-       const int                       index() const;\r
-       video_format_desc       get_format_desc();\r
-       void                            set_format_desc(const video_format_desc& format_desc);\r
-       executor&                       execution();\r
-       ogl_device&                     ogl();\r
-       std::wstring            print() const;\r
-private:\r
-       struct implementation;\r
-       std::shared_ptr<implementation> impl_;\r
-};\r
-       \r
-}}
\ No newline at end of file
index 34d1bfcf31f60f5a0c46f93bcab33ba45f7750ac..6cb6888d674285113126f699745f4d49af499f5c 100644 (file)
@@ -172,9 +172,9 @@ public:
        \r
 class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
 {              \r
-       Concurrency::bounded_buffer<ffmpeg::video_message_t>    video_frames_;\r
-       Concurrency::bounded_buffer<ffmpeg::audio_message_t>    audio_buffers_;\r
-       Concurrency::bounded_buffer<ffmpeg::frame_message_t>    muxed_frames_;\r
+       Concurrency::bounded_buffer<safe_ptr<AVFrame>>                          video_frames_;\r
+       Concurrency::bounded_buffer<safe_ptr<core::audio_buffer>>       audio_buffers_;\r
+       Concurrency::bounded_buffer<safe_ptr<core::basic_frame>>        muxed_frames_;\r
 \r
        const core::video_format_desc           format_desc_;\r
        const size_t                                            device_index_;\r
@@ -221,7 +221,7 @@ public:
 \r
                try\r
                {\r
-                       last_frame_ = frame = Concurrency::receive(muxed_frames_)->payload;\r
+                       last_frame_ = frame = Concurrency::receive(muxed_frames_);\r
                }\r
                catch(Concurrency::operation_timed_out&)\r
                {               \r
@@ -297,7 +297,7 @@ public:
                                                auto frame = filter_.poll();\r
                                                if(!frame)\r
                                                        break;\r
-                                               Concurrency::send(video_frames_, ffmpeg::make_message(frame, std::make_shared<ffmpeg::token>(semaphore_)));\r
+                                               Concurrency::send(video_frames_, make_safe_ptr(frame));\r
                                        }\r
                                },\r
                                [&]\r
@@ -307,10 +307,10 @@ public:
                                        {\r
                                                auto sample_frame_count = audio->GetSampleFrameCount();\r
                                                auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
-                                               Concurrency::send(audio_buffers_, ffmpeg::make_message(std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), std::make_shared<ffmpeg::token>(semaphore_)));\r
+                                               Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
                                        }\r
                                        else\r
-                                               Concurrency::send(audio_buffers_, ffmpeg::make_message(ffmpeg::empty_audio(), std::make_shared<ffmpeg::token>(semaphore_)));    \r
+                                               Concurrency::send(audio_buffers_, ffmpeg::empty_audio());       \r
                                });\r
                        }\r
 \r
index 13ba7a2b0c94e426ff0d58c8d1a0ee77b85487a9..0d4f53cfef8b4beb4e015bad83efacde38151414 100644 (file)
@@ -58,9 +58,9 @@ struct audio_decoder::implementation : public agent, boost::noncopyable
        \r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
 \r
-       overwrite_buffer<bool>                          is_running_;\r
-       unbounded_buffer<packet_message_t>      source_;\r
-       ITarget<audio_message_t>&                       target_;\r
+       overwrite_buffer<bool>                                  is_running_;\r
+       unbounded_buffer<safe_ptr<AVPacket>>    source_;\r
+       ITarget<safe_ptr<core::audio_buffer>>&  target_;\r
        \r
 public:\r
        explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
@@ -69,9 +69,9 @@ public:
                                         format_desc.audio_sample_rate, codec_context_->sample_rate,\r
                                         AV_SAMPLE_FMT_S32,                             codec_context_->sample_fmt)\r
                , buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
-               , source_([this](const packet_message_t& message)\r
+               , source_([this](const safe_ptr<AVPacket>& packet)\r
                        {\r
-                               return message->payload && message->payload->stream_index == index_;\r
+                               return packet->stream_index == index_;\r
                        })\r
                , target_(target)\r
        {                               \r
@@ -95,16 +95,12 @@ public:
                        send(is_running_, true);\r
                        while(is_running_.value())\r
                        {                               \r
-                               auto message = receive(source_);\r
-                               auto packet = message->payload;\r
+                               auto packet = receive(source_);\r
                        \r
-                               if(!packet)\r
-                                       continue;\r
-\r
                                if(packet == loop_packet(index_))\r
                                {\r
-                                       send(target_, make_message(loop_audio()));\r
-                                       break;\r
+                                       send(target_, loop_audio());\r
+                                       continue;\r
                                }\r
 \r
                                if(packet == eof_packet(index_))\r
@@ -130,7 +126,7 @@ public:
                                        const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
                                        const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
 \r
-                                       send(target_, make_message(std::make_shared<core::audio_buffer>(samples, samples + n_samples)));\r
+                                       send(target_, make_safe<core::audio_buffer>(samples, samples + n_samples));\r
                                }\r
                        }\r
                }\r
@@ -140,7 +136,7 @@ public:
                }\r
 \r
                send(is_running_, false);\r
-               send(target_, make_message(eof_audio()));\r
+               send(target_, eof_audio());\r
 \r
                done();\r
        }\r
index 4cb89ce5d171de1b618f388ebe3f47ff005da9ab..292b87baf0b5ce8c65e80f133cb7899193a1f56c 100644 (file)
@@ -47,8 +47,8 @@ class audio_decoder : boost::noncopyable
 {\r
 public:\r
 \r
-       typedef Concurrency::ISource<packet_message_t>& source_t;\r
-       typedef Concurrency::ITarget<audio_message_t>& target_t;\r
+       typedef Concurrency::ISource<safe_ptr<AVPacket>>& source_t;\r
+       typedef Concurrency::ITarget<safe_ptr<core::audio_buffer>>& target_t;\r
        \r
        explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
        \r
index 429cc83ae2c0a195e701cc01217e6f24f41121f5..029505a4f93af63c145948bd3bf4336a5aca05c5 100644 (file)
@@ -57,25 +57,25 @@ namespace caspar { namespace ffmpeg {
                \r
 struct ffmpeg_producer : public core::frame_producer\r
 {      \r
-       const std::wstring                                              filename_;\r
-       const int                                                               start_;\r
-       const bool                                                              loop_;\r
-       const size_t                                                    length_;\r
+       const std::wstring                                                              filename_;\r
+       const int                                                                               start_;\r
+       const bool                                                                              loop_;\r
+       const size_t                                                                    length_;\r
        \r
-       Concurrency::unbounded_buffer<packet_message_t> packets_;\r
-       Concurrency::unbounded_buffer<video_message_t>  video_;\r
-       Concurrency::unbounded_buffer<audio_message_t>  audio_;\r
-       Concurrency::bounded_buffer<frame_message_t>    frames_;\r
-       Concurrency::call<packet_message_t>                             throw_away_;\r
+       call<safe_ptr<AVPacket>>                                                throw_away_;\r
+       unbounded_buffer<safe_ptr<AVPacket>>                    packets_;\r
+       unbounded_buffer<safe_ptr<AVFrame>>                             video_;\r
+       unbounded_buffer<safe_ptr<core::audio_buffer>>  audio_;\r
+       bounded_buffer<safe_ptr<core::basic_frame>>             frames_;\r
                \r
-       const safe_ptr<diagnostics::graph>              graph_;\r
+       const safe_ptr<diagnostics::graph>                              graph_;\r
                                        \r
-       input                                                                   input_; \r
-       std::shared_ptr<video_decoder>                  video_decoder_;\r
-       std::shared_ptr<audio_decoder>                  audio_decoder_; \r
-       std::unique_ptr<frame_muxer2>                   muxer_;\r
+       input                                                                                   input_; \r
+       std::shared_ptr<video_decoder>                                  video_decoder_;\r
+       std::shared_ptr<audio_decoder>                                  audio_decoder_; \r
+       std::unique_ptr<frame_muxer2>                                   muxer_;\r
 \r
-       safe_ptr<core::basic_frame>                             last_frame_;\r
+       safe_ptr<core::basic_frame>                                             last_frame_;\r
        \r
 public:\r
        explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
@@ -83,7 +83,7 @@ public:
                , start_(start)\r
                , loop_(loop)\r
                , length_(length)\r
-               , throw_away_([](const packet_message_t&){})\r
+               , throw_away_([](const safe_ptr<AVPacket>&){})\r
                , frames_(2)\r
                , graph_(diagnostics::create_graph("", false))\r
                , input_(packets_, graph_, filename_, loop, start, length)\r
@@ -137,7 +137,7 @@ public:
        ~ffmpeg_producer()\r
        {\r
                input_.stop();                  \r
-               while(Concurrency::receive(frames_)->payload != core::basic_frame::eof())\r
+               while(Concurrency::receive(frames_) != core::basic_frame::eof())\r
                {\r
                }\r
        }\r
@@ -148,10 +148,10 @@ public:
                \r
                try\r
                {               \r
-                       frame = last_frame_ = Concurrency::receive(frames_, 10)->payload;\r
+                       frame = last_frame_ = Concurrency::receive(frames_, 10);\r
                        graph_->update_text(narrow(print()));\r
                }\r
-               catch(Concurrency::operation_timed_out&)\r
+               catch(operation_timed_out&)\r
                {               \r
                        graph_->add_tag("underflow");   \r
                }\r
index fda2ee24aa56c988aeaac7034373aeab56efbf74..0f66206c149f1c73daea1ccc9fce3e6db75efbd7 100644 (file)
@@ -134,24 +134,26 @@ display_mode::type get_display_mode(const core::field_mode::type in_mode, double
 \r
 struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
 {              \r
-       ITarget<frame_message_t>&                                                                                                               target_;\r
-       display_mode::type                                                                                                                              display_mode_;\r
-       const double                                                                                                                                    in_fps_;\r
-       const video_format_desc                                                                                                                 format_desc_;\r
-       bool                                                                                                                                                    auto_transcode_;\r
+       ITarget<safe_ptr<core::basic_frame>>&                   target_;\r
+       display_mode::type                                                              display_mode_;\r
+       const double                                                                    in_fps_;\r
+       const video_format_desc                                                 format_desc_;\r
+       bool                                                                                    auto_transcode_;\r
        \r
-       filter                                                                                                                                                  filter_;\r
-       const safe_ptr<core::frame_factory>                                                                                             frame_factory_;\r
+       filter                                                                                  filter_;\r
+       const safe_ptr<core::frame_factory>                             frame_factory_;\r
        \r
-       call<video_message_t>                                                                                                                   push_video_;\r
-       call<audio_message_t>                                                                                                                   push_audio_;\r
+       call<safe_ptr<AVFrame>>                                                 push_video_;\r
+       call<safe_ptr<core::audio_buffer>>                              push_audio_;\r
        \r
-       transformer<video_message_t, std::shared_ptr<message<std::shared_ptr<write_frame>>>>    video_;\r
-       unbounded_buffer<audio_message_t>                                                                                               audio_;\r
+       unbounded_buffer<safe_ptr<AVFrame>>                             video_;\r
+       unbounded_buffer<safe_ptr<core::audio_buffer>>  audio_;\r
        \r
-       core::audio_buffer                                                                                                                              audio_data_;\r
+       core::audio_buffer                                                              audio_data_;\r
 \r
-       Concurrency::overwrite_buffer<bool>                                                                                             is_running_;\r
+       Concurrency::overwrite_buffer<bool>                             is_running_;\r
+\r
+       safe_ptr<semaphore> semaphore_;\r
                                                        \r
        implementation(frame_muxer2::video_source_t* video_source,\r
                                   frame_muxer2::audio_source_t* audio_source,\r
@@ -166,7 +168,7 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                , frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
                , push_video_(std::bind(&implementation::push_video, this, std::placeholders::_1))\r
                , push_audio_(std::bind(&implementation::push_audio, this, std::placeholders::_1))\r
-               , video_(std::bind(&implementation::make_write_frame, this, std::placeholders::_1))\r
+               , semaphore_(make_safe<semaphore>(8))\r
        {\r
                if(video_source)\r
                        video_source->link_target(&push_video_);\r
@@ -181,18 +183,7 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                send(is_running_, false);\r
                agent::wait(this);\r
        }\r
-\r
-       std::shared_ptr<message<std::shared_ptr<core::write_frame>>> make_write_frame(const video_message_t& message)\r
-       {\r
-               if(message->payload == eof_video())\r
-                       return make_message<std::shared_ptr<core::write_frame>>(nullptr);\r
-\r
-               if(message->payload == empty_video())\r
-                       return make_message<std::shared_ptr<core::write_frame>>(std::make_shared<core::write_frame>(this));\r
-\r
-               return make_message<std::shared_ptr<core::write_frame>>(ffmpeg::make_write_frame(this, make_safe_ptr(message->payload), frame_factory_, 0), message->token);\r
-       }       \r
-\r
+       \r
        virtual void run()\r
        {\r
                try\r
@@ -202,20 +193,26 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                        {\r
                                auto video = receive(video_);\r
                                auto audio = receive(audio_);                                                           \r
-\r
-                               if(!audio->payload)\r
+                               auto frame = make_safe<core::write_frame>(this);\r
+                               \r
+                               if(audio == eof_audio())\r
                                {\r
                                        send(is_running_ , false);\r
                                        break;\r
                                }\r
 \r
-                               if(!video->payload)\r
+                               if(video == eof_video())\r
                                {\r
                                        send(is_running_ , false);\r
                                        break;\r
                                }\r
 \r
-                               video->payload->audio_data() = std::move(*audio->payload);\r
+                               if(video != empty_video())\r
+                                       frame = make_write_frame(this, video, frame_factory_, 0);\r
+                               if(audio == empty_audio())\r
+                                       audio = make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
+\r
+                               frame->audio_data() = std::move(*audio);\r
 \r
                                switch(display_mode_)\r
                                {\r
@@ -223,40 +220,35 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                                case display_mode::deinterlace:\r
                                case display_mode::deinterlace_bob:\r
                                        {\r
-                                               auto message = make_message(safe_ptr<core::basic_frame>(video->payload), video->token ? video->token : audio->token);\r
-                                               send(target_, message);\r
+                                               send(target_, safe_ptr<core::basic_frame>(frame));\r
 \r
                                                break;\r
                                        }\r
                                case display_mode::duplicate:                                   \r
                                        {                                                                               \r
-                                               auto message = make_message(safe_ptr<core::basic_frame>(video->payload), video->token ? video->token : audio->token);\r
-                                               send(target_, message);\r
-                                               send(target_, message);\r
+                                               send(target_, safe_ptr<core::basic_frame>(frame));\r
+                                               send(target_, safe_ptr<core::basic_frame>(frame));\r
 \r
                                                break;\r
                                        }\r
                                case display_mode::half:                                                \r
                                        {                                       \r
-                                               receive(video_);\r
-\r
-                                               auto message = make_message(safe_ptr<core::basic_frame>(video->payload), video->token ? video->token : audio->token);\r
-                                               send(target_, message);\r
+                                               receive(video_); // throw away                          \r
+                                               send(target_, safe_ptr<core::basic_frame>(frame));\r
 \r
                                                break;\r
                                        }\r
                                case display_mode::deinterlace_bob_reinterlace:\r
                                case display_mode::interlace:                                   \r
                                        {                                       \r
-                                               auto frame = safe_ptr<core::basic_frame>(video->payload);\r
+                                               /*auto frame = safe_ptr<core::basic_frame>(frame);\r
                                                auto video2 = receive(video_);  \r
-                                               if(video->payload)\r
-                                                       frame = core::basic_frame::interlace(make_safe_ptr(video->payload), make_safe_ptr(video2->payload), format_desc_.field_mode);\r
+                                               if(video2 != empty_video() && video2 != eof_video())\r
+                                                       frame = core::basic_frame::interlace(frame, safe_ptr<core::basic_frame>(video2), format_desc_.field_mode);\r
                                                else\r
                                                        send(is_running_, false);\r
 \r
-                                               auto message = make_message<safe_ptr<core::basic_frame>>(frame, video2->token ? video2->token : audio->token);\r
-                                               send(target_, message);\r
+                                               send(target_, frame);*/\r
 \r
                                                break;\r
                                        }\r
@@ -271,32 +263,21 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                }\r
                \r
                send(is_running_ , false);\r
-               send(target_, make_message(core::basic_frame::eof()));\r
+               send(target_, core::basic_frame::eof());\r
 \r
                done();\r
        }\r
                        \r
-       void push_video(const video_message_t& message)\r
-       {               \r
-               auto video_frame = message->payload;\r
-\r
-               if(!video_frame)\r
-                       return;\r
-               \r
-               if(video_frame == eof_video())\r
+       void push_video(const safe_ptr<AVFrame>& video_frame)\r
+       {                               \r
+               if(video_frame == eof_video() || video_frame == empty_video())\r
                {\r
-                       send(video_, make_message(eof_video()));\r
+                       send(video_, video_frame);\r
                        return;\r
                }\r
-\r
+                               \r
                if(video_frame == loop_video())         \r
                        return; \r
-                               \r
-               if(video_frame == empty_video())\r
-               {\r
-                       send(video_, make_message(empty_video()));\r
-                       return;\r
-               }\r
                \r
                if(display_mode_ == display_mode::invalid)\r
                {\r
@@ -341,29 +322,21 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                BOOST_FOREACH(auto av_frame, filter_.poll_all())\r
                {               \r
                        av_frame->format = format;                      \r
-                       send(video_, make_message(std::shared_ptr<AVFrame>(av_frame), message->token));\r
+                       send(video_, av_frame);\r
                }\r
        }\r
 \r
-       void push_audio(const audio_message_t& message)\r
+       void push_audio(const safe_ptr<core::audio_buffer>& audio_samples)\r
        {\r
-               auto audio_samples = message->payload;\r
-\r
-               if(!audio_samples)\r
-                       return;\r
-\r
-               if(audio_samples == eof_audio())\r
+               if(audio_samples == eof_audio() || audio_samples == empty_audio())\r
                {\r
-                       send(audio_, make_message(std::shared_ptr<core::audio_buffer>()));\r
+                       send(audio_, audio_samples);\r
                        return;\r
                }\r
 \r
                if(audio_samples == loop_audio())                       \r
                        return;         \r
 \r
-               if(audio_samples == empty_audio())              \r
-                       send(audio_, make_message(std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0)));              \r
-\r
                audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
                \r
                while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
@@ -371,7 +344,7 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                        auto begin = audio_data_.begin(); \r
                        auto end   = begin + format_desc_.audio_samples_per_frame;\r
                                        \r
-                       send(audio_, make_message(std::make_shared<core::audio_buffer>(begin, end), message->token));\r
+                       send(audio_, make_safe<core::audio_buffer>(begin, end));\r
 \r
                        audio_data_.erase(begin, end);\r
                }\r
index 36f0bf4ba9ea37fc0e42a887608919b03006845f..168d28211f8acd21021717163dd82f096fdbb837 100644 (file)
@@ -31,9 +31,9 @@ class frame_muxer2 : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<video_message_t>   video_source_t;\r
-       typedef Concurrency::ISource<audio_message_t>   audio_source_t;\r
-       typedef Concurrency::ITarget<frame_message_t>   target_t;\r
+       typedef Concurrency::ISource<safe_ptr<AVFrame>>                         video_source_t;\r
+       typedef Concurrency::ISource<safe_ptr<core::audio_buffer>>      audio_source_t;\r
+       typedef Concurrency::ITarget<safe_ptr<core::basic_frame>>       target_t;\r
                                                                 \r
        frame_muxer2(video_source_t* video_source,\r
                                 audio_source_t* audio_source, \r
index 174345fb0323632d267b14dde95ea5e41d9835bb..7d662a3531bc580f169ff4835996aeeabfb70b1b 100644 (file)
@@ -81,9 +81,7 @@ struct input::implementation : public Concurrency::agent, boost::noncopyable
        tbb::atomic<size_t>                                             packets_size_;\r
 \r
        bool                                                                    stop_;\r
-\r
-       safe_ptr<Concurrency::semaphore>                semaphore_;\r
-       \r
+               \r
 public:\r
        explicit implementation(input::target_t& target,\r
                                                        const safe_ptr<diagnostics::graph>& graph, \r
@@ -102,7 +100,6 @@ public:
                , length_(length)\r
                , frame_number_(0)\r
                , stop_(false)\r
-               , semaphore_(make_safe<Concurrency::semaphore>(MAX_TOKENS))\r
        {               \r
                packets_count_  = 0;\r
                packets_size_   = 0;\r
@@ -120,8 +117,6 @@ public:
        void stop()\r
        {\r
                stop_ = true;\r
-               for(size_t n = 0; n < format_context_->nb_streams+1; ++n)\r
-                       semaphore_->release();\r
                agent::wait(this);\r
        }\r
        \r
@@ -135,31 +130,8 @@ public:
                                if(!packet)\r
                                        break;\r
 \r
-                               Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
-                               Concurrency::wait(0);\r
-\r
-                               //std::vector<std::shared_ptr<AVPacket>> buffer;\r
-\r
-                               //while(buffer.size() < 100 && !stop_)\r
-                               //{\r
-                               //      Concurrency::scoped_oversubcription_token oversubscribe;\r
-                               //      auto packet = read_next_packet();\r
-                               //      if(!packet)\r
-                               //              stop_ = true;\r
-                               //      else\r
-                               //              buffer.push_back(packet);\r
-                               //}\r
-                               //                              \r
-                               //std::stable_partition(buffer.begin(), buffer.end(), [this](const std::shared_ptr<AVPacket>& packet)\r
-                               //{\r
-                               //      return packet->stream_index != default_stream_index_;\r
-                               //});\r
-\r
-                               //BOOST_FOREACH(auto packet, buffer)\r
-                               //{\r
-                               //      Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
-                               //      Concurrency::wait(0);\r
-                               //}\r
+                               Concurrency::asend(target_, make_safe_ptr(packet));\r
+                               Concurrency::wait(40);\r
                        }\r
                }\r
                catch(...)\r
@@ -168,7 +140,7 @@ public:
                }       \r
        \r
                BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_)));       \r
+                       Concurrency::send(target_, eof_packet(stream->index));  \r
 \r
                done();\r
        }\r
@@ -216,7 +188,7 @@ public:
                auto size = packet->size;\r
                auto data = packet->data;                       \r
 \r
-               packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+               packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
                {\r
                        packet->size = size;\r
                        packet->data = data;\r
@@ -247,7 +219,7 @@ public:
                packet->size = 0;\r
 \r
                BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::send(target_, make_message(loop_packet(stream->index), std::make_shared<token>(semaphore_)));      \r
+                       Concurrency::asend(target_, loop_packet(stream->index));        \r
 \r
                graph_->add_tag("seek");                \r
        }               \r
index ce704c8000f7d74b1197cb38a0bf1fa15fca6f41..7db09bd525bfc6b07b2a1d7864a7dbab54b79936 100644 (file)
@@ -49,7 +49,7 @@ class input : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ITarget<packet_message_t> target_t;\r
+       typedef Concurrency::ITarget<safe_ptr<AVPacket>> target_t;\r
 \r
        explicit input(target_t& target, \r
                                   const safe_ptr<diagnostics::graph>& graph, \r
index 308b3c76795b0173da3659191d1850d9b35042e3..87673e954b8446fce4a352f0fd6b72ff2d5c64dc 100644 (file)
@@ -296,9 +296,9 @@ void fix_meta_data(AVFormatContext& context)
        video_context.time_base.den = static_cast<int>(closest_fps*1000000.0);\r
 }\r
 \r
-std::shared_ptr<AVPacket> create_packet()\r
+safe_ptr<AVPacket> create_packet()\r
 {\r
-       std::shared_ptr<AVPacket> packet(new AVPacket, [](AVPacket* p)\r
+       safe_ptr<AVPacket> packet(new AVPacket, [](AVPacket* p)\r
        {\r
                av_free_packet(p);\r
                delete p;\r
@@ -308,9 +308,9 @@ std::shared_ptr<AVPacket> create_packet()
        return packet;\r
 }\r
 \r
-const std::shared_ptr<AVPacket>& loop_packet(int index)\r
+const safe_ptr<AVPacket>& loop_packet(int index)\r
 {\r
-       static std::shared_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
+       static safe_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
        \r
        auto& packet = packets[index];\r
        packet->stream_index = index;\r
@@ -318,9 +318,9 @@ const std::shared_ptr<AVPacket>& loop_packet(int index)
        return packet;\r
 }\r
 \r
-const std::shared_ptr<AVPacket>& eof_packet(int index)\r
+const safe_ptr<AVPacket>& eof_packet(int index)\r
 {\r
-       static std::shared_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
+       static safe_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
        \r
        auto& packet = packets[index];\r
        packet->stream_index = index;\r
@@ -328,39 +328,39 @@ const std::shared_ptr<AVPacket>& eof_packet(int index)
        return packet;\r
 }\r
 \r
-const std::shared_ptr<AVFrame>& loop_video()\r
+const safe_ptr<AVFrame>& loop_video()\r
 {\r
-       static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+       static auto frame1 = safe_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
        return frame1;\r
 }\r
 \r
-const std::shared_ptr<AVFrame>& empty_video()\r
+const safe_ptr<AVFrame>& empty_video()\r
 {\r
-       static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+       static auto frame1 = safe_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
        return frame1;\r
 }\r
 \r
-const std::shared_ptr<AVFrame>& eof_video()\r
+const safe_ptr<AVFrame>& eof_video()\r
 {\r
-       static auto frame2 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+       static auto frame2 = safe_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
        return frame2;\r
 }\r
 \r
-const std::shared_ptr<core::audio_buffer>& loop_audio()\r
+const safe_ptr<core::audio_buffer>& loop_audio()\r
 {\r
-       static auto audio1 = std::make_shared<core::audio_buffer>();\r
+       static auto audio1 = safe_ptr<core::audio_buffer>();\r
        return audio1;\r
 }\r
 \r
-const std::shared_ptr<core::audio_buffer>& empty_audio()\r
+const safe_ptr<core::audio_buffer>& empty_audio()\r
 {\r
-       static auto audio1 = std::make_shared<core::audio_buffer>();\r
+       static auto audio1 = safe_ptr<core::audio_buffer>();\r
        return audio1;\r
 }\r
 \r
-const std::shared_ptr<core::audio_buffer>& eof_audio()\r
+const safe_ptr<core::audio_buffer>& eof_audio()\r
 {\r
-       static auto audio2 = std::make_shared<core::audio_buffer>();\r
+       static auto audio2 = safe_ptr<core::audio_buffer>();\r
        return audio2;\r
 }\r
 \r
index 89508de07062964a4f2db633936fa0eba8701e7e..f33d88f3aaa2adc5cf64da3e7925dcc7e6f4225b 100644 (file)
@@ -40,56 +40,16 @@ struct frame_factory;
 namespace ffmpeg {\r
        \r
 // Dataflow\r
-\r
-class token\r
-{\r
-       safe_ptr<Concurrency::semaphore> semaphore_;\r
-public:\r
-       token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
-               : semaphore_(semaphore)\r
-       {\r
-               semaphore_->acquire();\r
-       }\r
-\r
-       ~token()\r
-       {\r
-               semaphore_->release();\r
-       }\r
-};\r
-\r
-template <typename T>\r
-struct message\r
-{\r
-       message(const T& payload = T(), const std::shared_ptr<token>& token = nullptr)\r
-               : payload(payload)\r
-               , token(token)\r
-       {\r
-       }\r
-\r
-       T                                               payload;\r
-       std::shared_ptr<token>  token;\r
-};\r
-\r
-template<typename T>\r
-safe_ptr<message<T>> make_message(const T& payload, const std::shared_ptr<token>& token = nullptr)\r
-{\r
-       return make_safe<message<T>>(payload, token);\r
-}\r
-\r
-typedef safe_ptr<message<std::shared_ptr<AVPacket>>>                   packet_message_t;\r
-typedef safe_ptr<message<std::shared_ptr<AVFrame>>>                            video_message_t;\r
-typedef safe_ptr<message<std::shared_ptr<core::audio_buffer>>> audio_message_t;\r
-typedef safe_ptr<message<safe_ptr<core::basic_frame>>>                 frame_message_t;\r
        \r
-const std::shared_ptr<AVPacket>& loop_packet(int index);\r
-const std::shared_ptr<AVPacket>& eof_packet(int index);\r
+const safe_ptr<AVPacket>& loop_packet(int index);\r
+const safe_ptr<AVPacket>& eof_packet(int index);\r
 \r
-const std::shared_ptr<AVFrame>& loop_video();\r
-const std::shared_ptr<AVFrame>& empty_video();\r
-const std::shared_ptr<AVFrame>& eof_video();\r
-const std::shared_ptr<core::audio_buffer>& loop_audio();\r
-const std::shared_ptr<core::audio_buffer>& empty_audio();\r
-const std::shared_ptr<core::audio_buffer>& eof_audio();\r
+const safe_ptr<AVFrame>& loop_video();\r
+const safe_ptr<AVFrame>& empty_video();\r
+const safe_ptr<AVFrame>& eof_video();\r
+const safe_ptr<core::audio_buffer>& loop_audio();\r
+const safe_ptr<core::audio_buffer>& empty_audio();\r
+const safe_ptr<core::audio_buffer>& eof_audio();\r
 \r
 // Utils\r
 \r
@@ -103,7 +63,7 @@ safe_ptr<core::write_frame> make_write_frame(const void* tag, const safe_ptr<AVF
 \r
 void fix_meta_data(AVFormatContext& context);\r
 \r
-std::shared_ptr<AVPacket> create_packet();\r
+safe_ptr<AVPacket> create_packet();\r
 \r
 safe_ptr<AVCodecContext> open_codec(AVFormatContext& context,  enum AVMediaType type, int& index);\r
 safe_ptr<AVFormatContext> open_input(const std::wstring& filename);\r
index e69c25227eef0ef7012f05f5562614645ab79624..cb57090d7265737095a1cb1fc3159a66d36a0c33 100644 (file)
@@ -61,10 +61,10 @@ struct video_decoder::implementation : public Concurrency::agent, boost::noncopy
        bool                                                                    is_progressive_;\r
        \r
        overwrite_buffer<bool>                                  is_running_;\r
-       unbounded_buffer<packet_message_t>              source_;\r
-       ITarget<video_message_t>&                               target_;\r
+       unbounded_buffer<safe_ptr<AVPacket>>    source_;\r
+       ITarget<safe_ptr<AVFrame>>&                             target_;\r
        \r
-       safe_ptr<semaphore> semaphore_;\r
+       safe_ptr<semaphore>                                             semaphore_;\r
 \r
 public:\r
        explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
@@ -74,9 +74,9 @@ public:
                , width_(codec_context_->width)\r
                , height_(codec_context_->height)\r
                , is_progressive_(true)\r
-               , source_([this](const packet_message_t& message)\r
+               , source_([this](const safe_ptr<AVPacket>& packet)\r
                        {\r
-                               return message->payload && message->payload->stream_index == index_;\r
+                               return packet->stream_index == index_;\r
                        })\r
                , target_(target)\r
                , semaphore_(make_safe<Concurrency::semaphore>(1))\r
@@ -104,26 +104,23 @@ public:
                        send(is_running_, true);\r
                        while(is_running_.value())\r
                        {\r
-                               auto message = receive(source_);\r
-                               auto packet = message->payload;\r
+                               auto packet = receive(source_);\r
                        \r
-                               if(!packet)\r
-                                       continue;\r
-\r
                                if(packet == loop_packet(index_))\r
                                {\r
-                                       send(target_, make_message(loop_video()));\r
+                                       send(target_, loop_video());\r
                                        continue;\r
                                }\r
 \r
                                if(packet == eof_packet(index_))\r
                                        break;\r
 \r
-                               token token(semaphore_);\r
-                               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
+                               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this](AVFrame* frame)\r
                                {\r
                                        av_free(frame);\r
+                                       semaphore_->release();\r
                                });\r
+                               semaphore_->acquire();\r
 \r
                                int frame_finished = 0;\r
                                THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
@@ -141,7 +138,7 @@ public:
                \r
                                is_progressive_ = decoded_frame->interlaced_frame == 0;\r
                                \r
-                               send(target_, make_message(decoded_frame, message->token));\r
+                               send(target_, make_safe_ptr(decoded_frame));\r
                                Concurrency::wait(10);\r
                        }\r
                }\r
@@ -151,7 +148,7 @@ public:
                }\r
                \r
                send(is_running_, false),\r
-               send(target_, make_message(eof_video()));\r
+               send(target_, eof_video());\r
 \r
                done();\r
        }\r
index eefa91d4ff82d2372823dc1143d164c231680942..a6107c1c61594c6a06fbc543496bc559219781cd 100644 (file)
@@ -47,8 +47,8 @@ class video_decoder : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<packet_message_t> source_t;\r
-       typedef Concurrency::ITarget<video_message_t>  target_t;\r
+       typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
+       typedef Concurrency::ITarget<safe_ptr<AVFrame>>  target_t;\r
        \r
        explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context);   \r
 \r
index 9ecddc83175237be642c56c14ec086af7c4ed844..a5cae2673af339db590c2cb7dd9e5fc267c9cedb 100644 (file)
@@ -294,6 +294,7 @@ public:
                auto av_frame = get_av_frame();\r
                av_frame->data[0] = const_cast<uint8_t*>(frame->image_data().begin());\r
 \r
+               filter_.push(av_frame);\r
                auto frames = filter_.poll_all();\r
                \r
                if(frames.empty())\r
index 0fb9cfd01cc1bb53d86c8246e2a3e8b2c26681af..695fb103ef8dcd8c443c3ed8bce29894c16708eb 100644 (file)
@@ -33,7 +33,6 @@
 \r
 #include <core/producer/frame_producer.h>\r
 #include <core/video_format.h>\r
-#include <core/video_channel_context.h>\r
 #include <core/producer/transition/transition_producer.h>\r
 #include <core/producer/frame/frame_transform.h>\r
 #include <core/producer/stage.h>\r
@@ -1334,13 +1333,14 @@ bool SetCommand::DoExecute()
 \r
        if(name == TEXT("MODE"))\r
        {\r
-               auto format_desc = core::video_format_desc::get(value);\r
-               if(format_desc.format != core::video_format::invalid)\r
-               {\r
-                       GetChannel()->set_video_format_desc(format_desc);\r
-                       SetReplyString(TEXT("202 SET MODE OK\r\n"));\r
-               }\r
-               else\r
+               // C-TODO\r
+               //auto format_desc = core::video_format_desc::get(value);\r
+               //if(format_desc.format != core::video_format::invalid)\r
+               //{\r
+               //      GetChannel()->set_video_format_desc(format_desc);\r
+               //      SetReplyString(TEXT("202 SET MODE OK\r\n"));\r
+               //}\r
+               //else\r
                        SetReplyString(TEXT("501 SET MODE FAILED\r\n"));\r
        }\r
        else\r
index 8f05f0fc403e471249892ae2084a0c4c15c0fa84..a19056ea4b5b676314a59de4b0c54d9ad931af36 100644 (file)
     </producers>\r
     <channels>\r
       <channel>\r
-        <video-mode>1080i5000</video-mode>\r
+        <video-mode>PAL</video-mode>\r
         <consumers>\r
           <decklink>\r
             <device>1</device>\r
             <embedded-audio>true</embedded-audio>\r
           </decklink>\r
+          <screen>\r
+            <device>1</device>\r
+          </screen>\r
           <audio></audio>\r
         </consumers>\r
       </channel>\r