]> git.sesse.net Git - casparcg/commitdiff
2.0.0.2: - write_frame: Optimized header includes.
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Fri, 27 May 2011 19:53:39 +0000 (19:53 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Fri, 27 May 2011 19:53:39 +0000 (19:53 +0000)
         - executor: Added try_ methods.
         - frame_consumer_device: Uses software clock when no hardware clock is available for synchronization.

git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.0.2@816 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

common/concurrency/executor.h
core/channel.cpp
core/consumer/frame_consumer.h
core/consumer/frame_consumer_device.cpp
core/mixer/write_frame.cpp
core/mixer/write_frame.h
core/producer/frame_producer_device.h
modules/ogl/consumer/ogl_consumer.cpp

index c133980ffb7b5fb9331fb001d0674e87d80635ac..f96962b9a00e9e363494fe9f36d22208e9784036 100644 (file)
@@ -68,6 +68,23 @@ enum priority
        priority_count\r
 };\r
 \r
+namespace internal\r
+{\r
+       template<typename T>\r
+       struct move_on_copy\r
+       {\r
+               move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}\r
+               move_on_copy(T&& value) : value(std::move(value)){}\r
+               mutable T value;\r
+       };\r
+\r
+       template<typename T>\r
+       move_on_copy<T> make_move_on_copy(T&& value)\r
+       {\r
+               return move_on_copy<T>(std::move(value));\r
+       }\r
+}\r
+\r
 class executor : boost::noncopyable\r
 {\r
        const std::string name_;\r
@@ -76,7 +93,27 @@ class executor : boost::noncopyable
        \r
        typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
        function_queue execution_queue_[priority_count];\r
-       \r
+               \r
+       template<typename Func>\r
+       auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept\r
+       {       \r
+               typedef boost::packaged_task<decltype(func())> task_type;\r
+                               \r
+               auto task = task_type(std::forward<Func>(func));\r
+               \r
+               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+               {\r
+                       try\r
+                       {\r
+                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
+                                       my_task();\r
+                       }\r
+                       catch(boost::task_already_started&){}\r
+               }));\r
+                               \r
+               return std::move(task);\r
+       }\r
+\r
 public:\r
                \r
        explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept\r
@@ -88,12 +125,7 @@ public:
        virtual ~executor() // noexcept\r
        {\r
                stop();\r
-               \r
-               std::function<void()> func;\r
-               while(execution_queue_[normal_priority].try_pop(func)){} // Wake all waiting push threads.\r
-\r
-               if(boost::this_thread::get_id() != thread_.get_id())\r
-                       thread_.join();\r
+               join();\r
        }\r
 \r
        void set_capacity(size_t capacity) // noexcept\r
@@ -111,40 +143,24 @@ public:
        {\r
                invoke([]{});\r
        }\r
+\r
+       void join()\r
+       {\r
+               if(boost::this_thread::get_id() != thread_.get_id())\r
+                       thread_.join();\r
+       }\r
                                \r
        template<typename Func>\r
        auto begin_invoke(Func&& func, priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
        {       \r
-               typedef boost::packaged_task<decltype(func())> task_type;\r
-                               \r
-               auto task = task_type(std::forward<Func>(func));\r
-               auto future = task.get_future();\r
-               \r
-               if(!is_running_)\r
-                       return std::move(future);       \r
-\r
-               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
-               {\r
-                       try\r
-                       {\r
-                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
-                                       my_task();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               }));\r
-                               \r
                // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
-               struct task_adaptor_t\r
-               {\r
-                       task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
-                       task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
-                       void operator()() const { task(); }\r
-                       mutable task_type task;\r
-               } task_adaptor(std::move(task));\r
+               auto task_adaptor = internal::make_move_on_copy(create_task(func));\r
+\r
+               auto future = task_adaptor.value.get_future();\r
 \r
                execution_queue_[priority].push([=]\r
                {\r
-                       try{task_adaptor();}\r
+                       try{task_adaptor.value();}\r
                        catch(boost::task_already_started&){}\r
                        catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
                });\r
@@ -154,7 +170,28 @@ public:
                                        \r
                return std::move(future);               \r
        }\r
-       \r
+\r
+       template<typename Func>\r
+       auto try_begin_invoke(Func&& func, priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
+       {\r
+               // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
+               auto task_adaptor = internal::make_move_on_copy(create_task(func));\r
+               \r
+               auto future = task_adaptor.value.get_future();\r
+\r
+               if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr))\r
+               {                       \r
+                       execution_queue_[priority].try_push([=]\r
+                       {\r
+                               try{task_adaptor.value();}\r
+                               catch(boost::task_already_started&){}\r
+                               catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
+                       });\r
+               }\r
+               \r
+               return std::move(future);                       \r
+       }\r
+\r
        template<typename Func>\r
        auto invoke(Func&& func, priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
        {\r
@@ -164,6 +201,15 @@ public:
                return begin_invoke(std::forward<Func>(func), prioriy).get();\r
        }\r
 \r
+       template<typename Func>\r
+       auto try_invoke(Func&& func, priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
+       {\r
+               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
+                       return func();\r
+               \r
+               return try_begin_invoke(std::forward<Func>(func), prioriy).get();\r
+       }\r
+\r
        void yield() // noexcept\r
        {\r
                if(boost::this_thread::get_id() != thread_.get_id())  // Only yield when calling from execution thread.\r
index b95f65407dcfdd207ced847d25c9c55b12e11629..4b9ff5175d90071f1ae87e59d8b34f66f2bdb871 100644 (file)
@@ -48,8 +48,8 @@ public:
        implementation(int index, const video_format_desc& format_desc)  \r
                : index_(index)\r
                , format_desc_(format_desc)\r
-               , mixer_(new frame_mixer_device(format_desc, [=](const safe_ptr<const read_frame>& frame){consumer_->send(frame);}))\r
                , consumer_(new frame_consumer_device(format_desc))\r
+               , mixer_(new frame_mixer_device(format_desc, [=](const safe_ptr<const read_frame>& frame){consumer_->send(frame);}))\r
                , producer_(new frame_producer_device(format_desc_, [=](const std::map<int, safe_ptr<basic_frame>>& frames){mixer_->send(frames);}))    \r
        {\r
                CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
index cc972b3fc06f4fe9e8173938e6d1dc5be674ed5d..48f414b33640ed785577961a8ede10b278ef35ca 100644 (file)
@@ -41,6 +41,7 @@ struct frame_consumer : boost::noncopyable
        virtual bool key_only() const{ return false;}\r
        virtual void initialize(const video_format_desc& format_desc) = 0;\r
        virtual std::wstring print() const = 0;\r
+       virtual bool has_synchronization_clock() const {return true;}\r
 \r
        static const safe_ptr<frame_consumer>& empty()\r
        {\r
index 2832c514acea85a60758dd2cf19585a49e7b287f..1035c7fa1be2dccb5849204f93fbadeee75d3c5f 100644 (file)
@@ -32,6 +32,7 @@
 #include <common/concurrency/executor.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/utility/assert.h>\r
+#include <common/utility/timer.h>\r
 #include <common/memory/memshfl.h>\r
 \r
 #include <boost/range/algorithm_ext/erase.hpp>\r
@@ -44,6 +45,8 @@ namespace caspar { namespace core {
        \r
 struct frame_consumer_device::implementation\r
 {      \r
+       high_prec_timer timer_;\r
+\r
        boost::circular_buffer<std::pair<safe_ptr<const read_frame>,safe_ptr<const read_frame>>> buffer_;\r
 \r
        std::map<int, std::shared_ptr<frame_consumer>> consumers_; // Valid iterators after erase\r
@@ -102,12 +105,15 @@ public:
        {               \r
                executor_.begin_invoke([=]\r
                {\r
+                       if(!std::any_of(consumers_.begin(), consumers_.end(), [](const decltype(*consumers_.begin())& p){return p.second->has_synchronization_clock();}))\r
+                               timer_.tick(1.0/format_desc_.fps);\r
+\r
                        diag_->set_value("input-buffer", static_cast<float>(executor_.size())/static_cast<float>(executor_.capacity()));\r
                        frame_timer_.restart();\r
                        \r
                        auto key_frame = read_frame::empty();\r
 \r
-                       if(boost::range::find_if(consumers_, [](const decltype(*consumers_.begin())& p){return p.second->key_only();}) != consumers_.end())\r
+                       if(std::any_of(consumers_.begin(), consumers_.end(), [](const decltype(*consumers_.begin())& p){return p.second->key_only();}))\r
                        {\r
                                // Currently do key_only transform on cpu. Unsure if the extra 400MB/s (1080p50) overhead is worth it to do it on gpu.\r
                                auto key_data = ogl_device::create_host_buffer(frame->image_data().size(), host_buffer::write_only);                            \r
@@ -120,7 +126,6 @@ public:
 \r
                        if(!buffer_.full())\r
                                return;\r
-\r
        \r
                        auto it = consumers_.begin();\r
                        while(it != consumers_.end())\r
index bf905b97cfaf850e2a22ebebd02556086b093792..db5258fbc9f07d29e39bb349d86fe7a0dce63846 100644 (file)
@@ -22,6 +22,8 @@
 #include "write_frame.h"\r
 \r
 #include "gpu/ogl_device.h"\r
+#include "gpu/host_buffer.h"\r
+#include "gpu/device_buffer.h"\r
 \r
 #include <core/producer/frame/pixel_format.h>\r
 \r
index 44134d45a660120903ef6f4b7ed92decb8ce41ef..35640605e29c11341d6bc8d8b620f36767b7b357 100644 (file)
@@ -24,9 +24,6 @@
 #include <core/producer/frame/frame_visitor.h>\r
 #include <core/producer/frame/pixel_format.h>\r
 \r
-#include "gpu/host_buffer.h"\r
-#include "gpu/device_buffer.h"\r
-\r
 #include <boost/noncopyable.hpp>\r
 #include <boost/range/iterator_range.hpp>\r
 \r
 #include <vector>\r
 \r
 namespace caspar { namespace core {\r
+\r
+class host_buffer;\r
+class device_buffer;\r
        \r
 class write_frame : public core::basic_frame, boost::noncopyable\r
 {\r
 public:        \r
        explicit write_frame(int tag, const core::pixel_format_desc& desc, const std::vector<safe_ptr<host_buffer>>& buffers, const std::vector<safe_ptr<device_buffer>>& textures);\r
                        \r
-       // core::write_frame\r
        virtual boost::iterator_range<uint8_t*> image_data(size_t plane_index = 0);     \r
        virtual std::vector<int16_t>& audio_data();\r
        \r
index 6ec495e362a1ee739b9e230b06ea94817281a1d2..239b1fd62c6c04c0ee7cd70c2fb2a3fa32ed99cb 100644 (file)
@@ -31,17 +31,6 @@ namespace caspar { namespace core {
 \r
 struct video_format_desc;\r
 \r
-////////////////////////////////////////////////////////////////////////////////////////////////////\r
-/// \class     frame_producer_device\r
-///\r
-/// \brief\r
-///            \r
-///                |**********| <-   empty frame   <- |***********| <-   frame format  <- |**********|\r
-///   PROTOCOL ->  | PRODUCER |                       |   MIXER          |                       | CONSUMER |  -> DISPLAY DEVICE\r
-///                |**********| -> rendered frames -> |***********| -> formatted frame -> |**********|\r
-///   \r
-////////////////////////////////////////////////////////////////////////////////////////////////////\r
-\r
 class frame_producer_device : boost::noncopyable\r
 {\r
 public:\r
index 7c3bb3288b53757ff6e118607006eefdf0a455a3..4afe3cca78147b247a9d601d227ec72df9cd678e 100644 (file)
@@ -270,10 +270,7 @@ public:
                \r
        void send(const safe_ptr<const core::read_frame>& frame)\r
        {\r
-               if(executor_.size() >= executor_.capacity()-1)\r
-                       return;\r
-\r
-               executor_.begin_invoke([=]\r
+               executor_.try_begin_invoke([=]\r
                {\r
                        perf_timer_.restart();\r
                        sf::Event e;\r
@@ -333,6 +330,11 @@ public:
        {\r
                return key_only_;\r
        }\r
+\r
+       virtual bool has_synchronization_clock() const \r
+       {\r
+               return false;\r
+       }\r
 };     \r
 \r
 safe_ptr<core::frame_consumer> create_ogl_consumer(const std::vector<std::wstring>& params)\r