]> git.sesse.net Git - casparcg/commitdiff
2.1.0: -executor: Refactored. -server: Uninitialize modules AFTER everything has...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 18 Feb 2012 19:01:45 +0000 (19:01 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 18 Feb 2012 19:01:45 +0000 (19:01 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@2450 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

accelerator/ogl/image/image_mixer.cpp
common/concurrency/executor.h
core/video_format.h
shell/main.cpp
shell/server.cpp

index 08ce960601e52e582044b860647a922f33cc2847..82672c57bab795b3f652dd0cef46c5335be8a4fc 100644 (file)
@@ -103,7 +103,7 @@ public:
                        auto buffer = spl::make_shared<const std::vector<uint8_t, tbb::cache_aligned_allocator<uint8_t>>>(format_desc.size, 0);\r
                        return async(launch::deferred, [=]\r
                        {\r
-                               return core::const_array(buffer->data(), static_cast<std::size_t>(format_desc.size), true, buffer);\r
+                               return core::const_array(buffer->data(), format_desc.size, true, buffer);\r
                        });\r
                }               \r
 \r
index 7f68ed6c0e6dce692550f519f450f74b81deed3c..c4190ecc036277918857045129dd1e24bc492f45 100644 (file)
@@ -26,6 +26,7 @@
 #include "../log.h"\r
 \r
 #include <tbb/atomic.h>\r
+#include <tbb/concurrent_priority_queue.h>\r
 #include <tbb/concurrent_queue.h>\r
 \r
 #include <boost/thread.hpp>\r
 #include <functional>\r
 \r
 namespace caspar {\r
-\r
-namespace detail {\r
-       \r
-template<typename T>\r
-struct move_on_copy\r
-{\r
-       move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}\r
-       move_on_copy(T&& value) : value(std::move(value)){}\r
-       mutable T value;\r
-};\r
-\r
-template<typename T>\r
-move_on_copy<T> make_move_on_copy(T&& value)\r
-{\r
-       return move_on_copy<T>(std::move(value));\r
-}\r
-\r
-}\r
-       \r
+               \r
 struct task_priority_def\r
 {\r
        enum type\r
        {\r
-               high_priority,\r
+               lower_priority = 1,\r
+               low_priority,\r
                normal_priority,\r
-               priority_count\r
+               high_priority,\r
+               higher_priority\r
        };\r
 };\r
 typedef enum_class<task_priority_def> task_priority;\r
 \r
 class executor\r
-{\r
+{      \r
+       struct priority_function\r
+       {\r
+               int                                             priority;\r
+               std::function<void()>   func;\r
+\r
+               priority_function()\r
+               {\r
+               }\r
+\r
+               template<typename F>\r
+               priority_function(int priority, F&& func)\r
+                       : priority(priority)\r
+                       , func(std::forward<F>(func))\r
+               {\r
+               }\r
+\r
+               void operator()()\r
+               {\r
+                       func();\r
+               }\r
+\r
+               bool operator<(const priority_function& other) const\r
+               {\r
+                       return priority < other.priority;\r
+               }\r
+       };\r
+\r
        executor(const executor&);\r
        executor& operator=(const executor&);\r
        \r
-       tbb::atomic<bool>       is_running_;\r
-       boost::thread           thread_;\r
-       \r
-       typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
-       function_queue execution_queue_[task_priority::priority_count];\r
-               \r
-       template<typename Func>\r
-       auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept\r
-       {       \r
-               typedef boost::packaged_task<decltype(func())> task_type;\r
-                               \r
-               auto task = task_type(std::forward<Func>(func));\r
-               \r
-               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
-               {\r
-                       try\r
-                       {\r
-                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
-                                       my_task();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               }));\r
-                               \r
-               return std::move(task);\r
-       }\r
+       typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;\r
 \r
+       tbb::atomic<bool>                                                                                       is_running_;\r
+       boost::thread                                                                                           thread_;        \r
+       function_queue_t                                                                                        execution_queue_;\r
+       tbb::concurrent_bounded_queue<int>                                                      semaphore_;\r
+               \r
 public:                \r
        executor(const std::wstring& name) // noexcept\r
        {\r
@@ -107,58 +100,62 @@ public:
                stop();\r
                thread_.join();\r
        }\r
-\r
-       void set_capacity(size_t capacity) // noexcept\r
-       {\r
-               execution_queue_[task_priority::normal_priority].set_capacity(capacity);\r
-       }\r
-       \r
-       void clear()\r
-       {               \r
-               std::function<void()> func;\r
-               while(execution_queue_[task_priority::normal_priority].try_pop(func));\r
-               while(execution_queue_[task_priority::high_priority].try_pop(func));\r
-       }\r
-                               \r
-       void stop() // noexcept\r
-       {\r
-               invoke([this]{is_running_ = false;});\r
-       }\r
-\r
-       void wait() // noexcept\r
-       {\r
-               invoke([]{});\r
-       }\r
-                               \r
+                                               \r
        template<typename Func>\r
        auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
        {       \r
                if(!is_running_)\r
                        BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));\r
+                               \r
+               typedef typename std::remove_reference<Func>::type      function_type;\r
+               typedef decltype(func())                                                        result_type;\r
+               typedef boost::packaged_task<result_type>                       task_type;\r
+                                                               \r
+               std::unique_ptr<task_type> task;\r
 \r
-               // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
-               auto task_adaptor = detail::make_move_on_copy(create_task(func));\r
-\r
-               auto future = task_adaptor.value.get_future();\r
+               // Use pointers since the boost thread library doesn't fully support move semantics.\r
 \r
-               execution_queue_[priority.value()].push([=]\r
+               auto raw_func2 = new function_type(std::forward<Func>(func));\r
+               try\r
                {\r
-                       try\r
+                       task.reset(new task_type([raw_func2]() -> result_type\r
                        {\r
-                               task_adaptor.value();\r
-                       }\r
-                       catch(boost::task_already_started&)\r
+                               std::unique_ptr<function_type> func2(raw_func2);\r
+                               return (*func2)();\r
+                       }));\r
+               }\r
+               catch(...)\r
+               {\r
+                       delete raw_func2;\r
+                       throw;\r
+               }\r
+               \r
+               task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+               {\r
+                       try\r
                        {\r
+                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
+                                       my_task();\r
                        }\r
-                       catch(...)\r
+                       catch(boost::task_already_started&){}\r
+               }));\r
+                               \r
+               auto future = task->get_future();\r
+\r
+               auto raw_task = task.release();\r
+               priority_function prio_func(priority.value(), [raw_task]\r
+               {\r
+                       std::unique_ptr<task_type> task(raw_task);\r
+                       try\r
                        {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                               (*task)();\r
                        }\r
+                       catch(boost::task_already_started&){}\r
                });\r
 \r
-               if(priority != task_priority::normal_priority)\r
-                       execution_queue_[task_priority::normal_priority].push(nullptr);\r
-                                       \r
+               execution_queue_.push(prio_func);\r
+               semaphore_.push(0);\r
+                                                       \r
                return std::move(future);               \r
        }\r
        \r
@@ -176,44 +173,52 @@ public:
                if(boost::this_thread::get_id() != thread_.get_id())\r
                        BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context."));\r
 \r
-               std::function<void()> func;\r
-               execution_queue_[task_priority::normal_priority].pop(func);     \r
-               \r
-               std::function<void()> func2;\r
-               while(execution_queue_[task_priority::high_priority].try_pop(func2))\r
-               {\r
-                       if(func2)\r
-                               func2();\r
-               }       \r
+               int dummy;\r
+               semaphore_.pop(dummy);\r
 \r
-               if(func)\r
+               priority_function func;\r
+               if(execution_queue_.try_pop(func))\r
                        func();\r
        }\r
 \r
-       void yield(task_priority priority) // noexcept\r
+       void set_capacity(std::size_t capacity) // noexcept\r
        {\r
-               if(boost::this_thread::get_id() != thread_.get_id())\r
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context."));\r
+               semaphore_.set_capacity(capacity);\r
+       }\r
 \r
-               if(priority == task_priority::high_priority)\r
+       std::size_t capacity() const\r
+       {\r
+               return semaphore_.capacity();\r
+       }\r
+       \r
+       void clear()\r
+       {               \r
+               priority_function func;\r
+               while(execution_queue_.try_pop(func));\r
+       }\r
+                               \r
+       void stop()\r
+       {\r
+               invoke([this]\r
                {\r
-                       std::function<void()> func2;\r
-                       while(execution_queue_[task_priority::high_priority].try_pop(func2))\r
-                       {\r
-                               if(func2)\r
-                                       func2();\r
-                       }       \r
-               }\r
-               else\r
-                       yield();\r
+                       is_running_ = false;\r
+               });\r
+       }\r
+\r
+       void wait()\r
+       {\r
+               invoke([]{});\r
        }\r
                \r
-       function_queue::size_type size() const /*noexcept*/\r
+       function_queue_t::size_type size() const \r
        {\r
-               return execution_queue_[task_priority::normal_priority].size() + execution_queue_[task_priority::high_priority].size(); \r
+               return execution_queue_.size(); \r
        }\r
                \r
-       bool is_running() const /*noexcept*/ { return is_running_; }    \r
+       bool is_running() const\r
+       {\r
+               return is_running_; \r
+       }       \r
                \r
 private:       \r
 \r
index 34cd6f6cfbf1e7e5cbc212a71456da3b3d921c99..6fa6f2025e220b35b72ecf3d5c4342723979774b 100644 (file)
@@ -81,7 +81,7 @@ struct video_format_desc sealed
        int                                     time_scale;\r
        int                                     duration;\r
        int                                     field_count;\r
-       int                                     size;           // frame size in bytes \r
+       std::size_t                     size;           // frame size in bytes \r
        std::wstring            name;           // name of output format\r
 \r
        int                                     audio_sample_rate;\r
index a797f34d2d1e738eb8e385ad44c7bfbb65001fcb..aabc6ab9b867c8adf24287c87154765315e25c68 100644 (file)
@@ -262,7 +262,6 @@ void run()
                wcmd += L"\r\n";\r
                amcp.Parse(wcmd.c_str(), static_cast<int>(wcmd.length()), console_client);\r
        }       \r
-       Sleep(1000);\r
        CASPAR_LOG(info) << "Successfully shutdown CasparCG Server.";\r
 }\r
 \r
index 98eac86427f1f7e4ad42d98348ba3696df5e0ede..9fbf722ea4778fba975f2a79894ec5cf87804dac 100644 (file)
@@ -104,11 +104,13 @@ struct server::impl : boost::noncopyable
 \r
        ~impl()\r
        {               \r
-               image::uninit();\r
-               ffmpeg::uninit();\r
-\r
                async_servers_.clear();\r
                channels_.clear();\r
+\r
+               Sleep(500); // HACK: Wait for asynchronous destruction of producers and consumers.\r
+\r
+               image::uninit();\r
+               ffmpeg::uninit();\r
        }\r
                                \r
        void setup_channels(const boost::property_tree::wptree& pt)\r