]> git.sesse.net Git - casparcg/commitdiff
2.1.0: - print_producer_proxy: Fixed for decklink and ffmpeg producers.
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Fri, 3 Feb 2012 14:11:55 +0000 (14:11 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Fri, 3 Feb 2012 14:11:55 +0000 (14:11 +0000)
- implemented std::async and use it instead of handrolling.

git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@2213 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

13 files changed:
common/common.vcxproj
common/common.vcxproj.filters
common/concurrency/async.h [new file with mode: 0644]
common/concurrency/defer.h [deleted file]
common/log.cpp
core/mixer/gpu/image/image_mixer.cpp
core/producer/frame_producer.cpp
core/producer/frame_producer.h
core/producer/layer.cpp
core/producer/layer.h
core/producer/stage.cpp
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/producer/ffmpeg_producer.cpp

index 8384347ea28435f234e9ec3eab08950457db7a87..329b3006b46060c36eb711a2ed13ba8e8bb7b4e7 100644 (file)
   <ItemGroup>\r
     <ClInclude Include="assert.h" />\r
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
-    <ClInclude Include="concurrency\defer.h" />\r
+    <ClInclude Include="concurrency\async.h" />\r
     <ClInclude Include="concurrency\executor.h" />\r
     <ClInclude Include="concurrency\lock.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
index 32fd24000f4a12db6f7c2762eea15cb80d5a0b54..38f206d846491de80f83a1fbb914df125da79675 100644 (file)
     <ClInclude Include="reactive.h">\r
       <Filter>source</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="concurrency\defer.h">\r
-      <Filter>source\concurrency</Filter>\r
-    </ClInclude>\r
     <ClInclude Include="except.h">\r
       <Filter>source</Filter>\r
     </ClInclude>\r
     <ClInclude Include="tweener.h">\r
       <Filter>source</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="concurrency\async.h">\r
+      <Filter>source\concurrency</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
diff --git a/common/concurrency/async.h b/common/concurrency/async.h
new file mode 100644 (file)
index 0000000..6963479
--- /dev/null
@@ -0,0 +1,89 @@
+#pragma once\r
+\r
+#include <functional>\r
+#include <memory>\r
+\r
+#include <boost/thread/future.hpp>\r
+#include <boost/thread/thread.hpp>\r
+\r
+#include <boost/utility/declval.hpp>\r
+\r
+#include "../enum_class.h"\r
+\r
+namespace caspar {\r
+\r
+struct launch_policy_def\r
+{\r
+       enum type\r
+       {\r
+               async,\r
+               deferred\r
+       };\r
+};\r
+typedef enum_class<launch_policy_def> launch_policy;\r
+\r
+namespace detail {\r
+\r
+template<typename R>\r
+struct invoke_function\r
+{      \r
+       template<typename F>\r
+       void operator()(boost::promise<R>& p, F& f)\r
+       {\r
+               p.set_value(f());\r
+       }\r
+};\r
+\r
+template<>\r
+struct invoke_function<void>\r
+{      \r
+       template<typename F>\r
+       void operator()(boost::promise<void>& p, F& f)\r
+       {\r
+               f();\r
+               p.set_value();\r
+       }\r
+};\r
+\r
+}\r
+       \r
+template<typename F>\r
+auto async(launch_policy lp, F&& f) -> boost::unique_future<decltype(f())>\r
+{              \r
+       typedef decltype(f()) result_type;\r
+\r
+       if(lp == launch_policy::deferred)\r
+       {\r
+               typedef boost::promise<result_type> promise_t;\r
+\r
+               auto promise = new promise_t();\r
+               auto future  = promise->get_future();\r
+       \r
+               promise->set_wait_callback(std::function<void(promise_t&)>([=](promise_t&) mutable\r
+               {\r
+                       std::unique_ptr<promise_t> pointer_guard(promise);\r
+                       detail::invoke_function<result_type>()(*promise, f);\r
+               }));\r
+\r
+               return std::move(future);\r
+       }\r
+       else\r
+       {\r
+               typedef boost::packaged_task<result_type> packaged_task_t;\r
+\r
+               auto task   = packaged_task_t(f);    \r
+               auto future = task.get_future();\r
+\r
+               boost::thread(std::move(task)).detach();\r
+\r
+               return std::move(future);\r
+       }\r
+}\r
+       \r
+template<typename F>\r
+auto async(F&& f) -> boost::unique_future<decltype(f())>\r
+{      \r
+       return async(launch_policy::async, std::forward<F>(f));\r
+}\r
+\r
+}
\ No newline at end of file
diff --git a/common/concurrency/defer.h b/common/concurrency/defer.h
deleted file mode 100644 (file)
index 6c8d0e2..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-#pragma once\r
-\r
-#include <functional>\r
-#include <memory>\r
-\r
-#include <boost/thread/future.hpp>\r
-\r
-namespace caspar {\r
-               \r
-template<typename F>\r
-auto defer(F&& f) -> boost::unique_future<decltype(f())>\r
-{      \r
-       typedef boost::promise<decltype(f())> promise_t;\r
-       auto p = new promise_t();\r
-\r
-       auto func = [=](promise_t&) mutable\r
-       {\r
-               std::unique_ptr<promise_t> guard(p);\r
-               p->set_value(f());\r
-       };\r
-\r
-       p->set_wait_callback(std::function<void(promise_t&)>(func));\r
-\r
-       return p->get_future();\r
-}\r
-\r
-\r
-}
\ No newline at end of file
index 6704ef0994632a214ca08f1f814e60153f8db18f..6368cb97da93a75e6c68f5101a91c694678ed61f 100644 (file)
 \r
 #include "log.h"\r
 \r
+#include "except.h"\r
 #include "utf.h"\r
 \r
-#include "exception/exceptions.h"\r
-\r
 #include <ios>\r
 #include <string>\r
 #include <ostream>\r
index fa1c1ce2ef536e8c349d5a103871d4d3e7754c27..b16a6d0d821a9b1b0a4f8d7ca976d8a90c1dd937 100644 (file)
@@ -31,7 +31,7 @@
 #include "../device_buffer.h"\r
 \r
 #include <common/gl/gl_check.h>\r
-#include <common/concurrency/defer.h>\r
+#include <common/concurrency/async.h>\r
 \r
 #include <core/frame/frame_transform.h>\r
 #include <core/frame/pixel_format.h>\r
@@ -111,7 +111,7 @@ public:
                        return result;\r
                });\r
 \r
-               return defer([=]() mutable -> boost::iterator_range<const uint8_t*>\r
+               return async(launch_policy::deferred, [=]() mutable -> boost::iterator_range<const uint8_t*>\r
                {\r
                        auto ptr = reinterpret_cast<const uint8_t*>(buffer.get()->data()); // .get() and ->data() can block calling thread, ->data() can also block OpenGL thread, defer it as long as possible.\r
                        return boost::iterator_range<const uint8_t*>(ptr, ptr + buffer.get()->size());\r
index 972600f0ba5d9398ca5fb2b233679ac6cf6a5e73..9dc3b2774724448a082f05de95718e32a2cda63c 100644 (file)
 #include "color/color_producer.h"\r
 #include "separated/separated_producer.h"\r
 \r
-#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/executor.h>\r
+#include <common/assert.h>\r
 #include <common/except.h>\r
-#include <common/utility/move_on_copy.h>\r
+#include <common/concurrency/executor.h>\r
+#include <common/concurrency/async.h>\r
+#include <common/memory/safe_ptr.h>\r
 \r
 namespace caspar { namespace core {\r
 \r
@@ -54,24 +55,15 @@ public:
 \r
        ~destroy_producer_proxy()\r
        {               \r
-               static auto destroyers = std::make_shared<tbb::concurrent_bounded_queue<std::shared_ptr<executor>>>();\r
-               static tbb::atomic<int> destroyer_count;\r
+               static tbb::atomic<int> counter = tbb::atomic<int>();\r
 \r
                try\r
-               {\r
-                       std::shared_ptr<executor> destroyer;\r
-                       if(!destroyers->try_pop(destroyer))\r
-                       {\r
-                               destroyer.reset(new executor(L"destroyer"));\r
-                               destroyer->set_priority_class(below_normal_priority_class);\r
-                               if(++destroyer_count > 16)\r
-                                       CASPAR_LOG(warning) << L"Potential destroyer dead-lock detected.";\r
-                               CASPAR_LOG(trace) << "Created destroyer: " << destroyer_count;\r
-                       }\r
-                               \r
+               {                               \r
                        auto producer = producer_.release();\r
-                       auto pool         = destroyers;\r
-                       destroyer->begin_invoke([=]\r
+                       ++counter;\r
+                       CASPAR_VERIFY(counter < 32);\r
+\r
+                       async([=]\r
                        {\r
                                std::unique_ptr<std::shared_ptr<frame_producer>> producer2(producer);\r
 \r
@@ -86,7 +78,8 @@ public:
                                catch(...){}\r
                                                                \r
                                producer2.reset();\r
-                               pool->push(destroyer);\r
+\r
+                               --counter;\r
                        }); \r
                }\r
                catch(...)\r
@@ -148,31 +141,6 @@ safe_ptr<core::frame_producer> create_producer_print_proxy(safe_ptr<core::frame_
        return make_safe<print_producer_proxy>(std::move(producer));\r
 }\r
 \r
-class last_frame_producer : public frame_producer\r
-{\r
-       const std::wstring                      print_;\r
-       const safe_ptr<draw_frame>      frame_;\r
-       const uint32_t                          nb_frames_;\r
-public:\r
-       last_frame_producer(const safe_ptr<frame_producer>& producer) \r
-               : print_(producer->print())\r
-               , frame_(producer->last_frame() != draw_frame::eof() ? producer->last_frame() : draw_frame::empty())\r
-               , nb_frames_(producer->nb_frames())\r
-       {\r
-       }\r
-       \r
-       virtual safe_ptr<draw_frame> receive(int){return frame_;}\r
-       virtual safe_ptr<core::draw_frame> last_frame() const{return frame_;}\r
-       virtual std::wstring print() const{return L"dummy[" + print_ + L"]";}\r
-       virtual uint32_t nb_frames() const {return nb_frames_;} \r
-       virtual boost::property_tree::wptree info() const override\r
-       {\r
-               boost::property_tree::wptree info;\r
-               info.add(L"type", L"last-frame-producer");\r
-               return info;\r
-       }\r
-};\r
-\r
 struct empty_frame_producer : public frame_producer\r
 {\r
        virtual safe_ptr<draw_frame> receive(int){return draw_frame::empty();}\r
@@ -207,8 +175,6 @@ safe_ptr<draw_frame> receive_and_follow(safe_ptr<frame_producer>& producer, int
                        following->set_leading_producer(producer);\r
                        producer = std::move(following);\r
                }\r
-               else\r
-                       producer = make_safe<last_frame_producer>(producer);\r
 \r
                return receive_and_follow(producer, hints);\r
        }\r
index ae07140bb93ff387685127b9eb5760f8fa280263..e9310068e7d83e925ea003dc1475e2020eacf080 100644 (file)
@@ -53,7 +53,6 @@ public:
        };\r
        typedef enum_class<flags_def> flags;\r
 \r
-       frame_producer(){}\r
        virtual ~frame_producer(){}     \r
 \r
        virtual std::wstring print() const = 0; // nothrow\r
index 64a242122a2cd524ee13670018455282f068b1fa..276f891bdfb3515c86e1c0de31f0dabc2bb708c4 100644 (file)
@@ -60,9 +60,9 @@ public:
                is_paused_ = false;\r
        }\r
 \r
-       void load(const safe_ptr<frame_producer>& producer, const boost::optional<int32_t>& auto_play_delta)\r
+       void load(safe_ptr<frame_producer> producer, const boost::optional<int32_t>& auto_play_delta)\r
        {               \r
-               background_              = producer;\r
+               background_              = std::move(producer);\r
                auto_play_delta_ = auto_play_delta;\r
 \r
                if(auto_play_delta_ && foreground_ == frame_producer::empty())\r
@@ -74,24 +74,22 @@ public:
                if(background_ != frame_producer::empty())\r
                {\r
                        background_->set_leading_producer(foreground_);\r
-                       \r
-                       foreground_                     = background_;\r
-                       background_                     = frame_producer::empty();\r
-                       frame_number_           = 0;\r
+                       foreground_             = background_;\r
+                       background_             = frame_producer::empty();\r
+                       frame_number_   = 0;\r
                        auto_play_delta_.reset();\r
                }\r
 \r
-               is_paused_                      = false;\r
+               resume();\r
        }\r
        \r
        void stop()\r
        {\r
-               foreground_                     = frame_producer::empty();\r
-               background_                     = background_;\r
-               frame_number_           = 0;\r
+               foreground_             = frame_producer::empty();\r
+               frame_number_   = 0;\r
                auto_play_delta_.reset();\r
 \r
-               is_paused_                      = true;\r
+               pause();\r
        }\r
                \r
        safe_ptr<draw_frame> receive(frame_producer::flags flags)\r
@@ -125,11 +123,6 @@ public:
                }\r
        }\r
        \r
-       bool empty() const\r
-       {\r
-               return background_ == core::frame_producer::empty() && foreground_ == core::frame_producer::empty();\r
-       }\r
-\r
        boost::property_tree::wptree info() const\r
        {\r
                boost::property_tree::wptree info;\r
@@ -159,13 +152,12 @@ void layer::swap(layer& other)
 {      \r
        impl_.swap(other.impl_);\r
 }\r
-void layer::load(const safe_ptr<frame_producer>& frame_producer, const boost::optional<int32_t>& auto_play_delta){return impl_->load(frame_producer, auto_play_delta);}        \r
+void layer::load(safe_ptr<frame_producer> frame_producer, const boost::optional<int32_t>& auto_play_delta){return impl_->load(std::move(frame_producer), auto_play_delta);}    \r
 void layer::play(){impl_->play();}\r
 void layer::pause(){impl_->pause();}\r
 void layer::stop(){impl_->stop();}\r
 safe_ptr<draw_frame> layer::receive(frame_producer::flags flags) {return impl_->receive(flags);}\r
 safe_ptr<frame_producer> layer::foreground() const { return impl_->foreground_;}\r
 safe_ptr<frame_producer> layer::background() const { return impl_->background_;}\r
-bool layer::empty() const {return impl_->empty();}\r
 boost::property_tree::wptree layer::info() const{return impl_->info();}\r
 }}
\ No newline at end of file
index a0c996a74685fc18890f7209883d7cd61f8eebd6..fc5d8b6fa9391051d333bc1558ab8d045eef420a 100644 (file)
@@ -45,13 +45,11 @@ public:
 \r
        void swap(layer& other); // nothrow \r
                \r
-       void load(const safe_ptr<struct frame_producer>& producer, const boost::optional<int32_t>& auto_play_delta = nullptr); // nothrow\r
+       void load(safe_ptr<struct frame_producer> producer, const boost::optional<int32_t>& auto_play_delta = nullptr); // nothrow\r
        void play(); // nothrow\r
        void pause(); // nothrow\r
        void stop(); // nothrow\r
                \r
-       bool empty() const;\r
-\r
        safe_ptr<struct frame_producer> foreground() const; // nothrow\r
        safe_ptr<struct frame_producer> background() const; // nothrow\r
 \r
index 40298d5cf43cad33e9926fb5f7c3fc7385ba482c..18b61fbc0e18851221e03562825850cadb523b78 100644 (file)
@@ -264,7 +264,7 @@ public:
 \r
        boost::unique_future<boost::property_tree::wptree> info(int index)\r
        {\r
-               return executor_.begin_invoke([=]() -> boost::property_tree::wptree\r
+               return executor_.begin_invoke([=]\r
                {\r
                        return layers_[index].info();\r
                }, high_priority);\r
index 1db16f3fde52fd2a4cd0ced8f90abcd5405b7d1b..10411744fed30f5edd98118db690c6c850d5c177 100644 (file)
@@ -340,8 +340,8 @@ safe_ptr<core::frame_producer> create_producer(const safe_ptr<core::frame_factor
        if(format_desc.format == core::video_format::invalid)\r
                format_desc = frame_factory->get_video_format_desc();\r
                        \r
-       return create_producer_print_proxy(\r
-                  create_producer_destroy_proxy(\r
+       return create_producer_destroy_proxy(\r
+                  create_producer_print_proxy(\r
                        make_safe<decklink_producer_proxy>(frame_factory, format_desc, device_index, filter_str, length)));\r
 }\r
 \r
index cb44117390e552d39957828cd5d65e1b32c6a80a..4c58d4acdad1c09a40e6a34fa3edc4dc86d0d7cd 100644 (file)
@@ -322,7 +322,9 @@ safe_ptr<core::frame_producer> create_producer(const safe_ptr<core::frame_factor
        boost::replace_all(filter_str, L"DEINTERLACE", L"YADIF=0:-1");\r
        boost::replace_all(filter_str, L"DEINTERLACE_BOB", L"YADIF=1:-1");\r
        \r
-       return create_producer_destroy_proxy(make_safe<ffmpeg_producer>(frame_factory, filename, filter_str, loop, start, length));\r
+       return create_producer_destroy_proxy(\r
+                  create_producer_print_proxy(\r
+                       make_safe<ffmpeg_producer>(frame_factory, filename, filter_str, loop, start, length)));\r
 }\r
 \r
 }}
\ No newline at end of file