]> git.sesse.net Git - casparcg/blobdiff - common/concurrency/executor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / executor.h
index a2427d3ceb3a951d25b0d3c99fdd70378678a421..ad86d4a379358f11083f56df523ba66b7f703e45 100644 (file)
@@ -20,7 +20,8 @@
 #pragma once\r
 \r
 #include "../exception/win32_exception.h"\r
-#include "../utility/assert.h"\r
+#include "../utility/string.h"\r
+#include "../utility/move_on_copy.h"\r
 #include "../log/log.h"\r
 \r
 #include <tbb/atomic.h>\r
@@ -61,13 +62,21 @@ inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
 \r
 }\r
 \r
-enum priority\r
+enum task_priority\r
 {\r
        high_priority,\r
        normal_priority,\r
        priority_count\r
 };\r
 \r
+enum thread_priority\r
+{\r
+       high_priority_class,\r
+       above_normal_priority_class,\r
+       normal_priority_class,\r
+       below_normal_priority_class\r
+};\r
+\r
 class executor : boost::noncopyable\r
 {\r
        const std::string name_;\r
@@ -76,7 +85,27 @@ class executor : boost::noncopyable
        \r
        typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
        function_queue execution_queue_[priority_count];\r
-       \r
+               \r
+       template<typename Func>\r
+       auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept\r
+       {       \r
+               typedef boost::packaged_task<decltype(func())> task_type;\r
+                               \r
+               auto task = task_type(std::forward<Func>(func));\r
+               \r
+               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+               {\r
+                       try\r
+                       {\r
+                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
+                                       my_task();\r
+                       }\r
+                       catch(boost::task_already_started&){}\r
+               }));\r
+                               \r
+               return std::move(task);\r
+       }\r
+\r
 public:\r
                \r
        explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept\r
@@ -88,18 +117,28 @@ public:
        virtual ~executor() // noexcept\r
        {\r
                stop();\r
-               \r
-               std::function<void()> func;\r
-               while(execution_queue_[normal_priority].try_pop(func)){} // Wake all waiting push threads.\r
-\r
-               if(boost::this_thread::get_id() != thread_.get_id())\r
-                       thread_.join();\r
+               join();\r
        }\r
 \r
        void set_capacity(size_t capacity) // noexcept\r
        {\r
                execution_queue_[normal_priority].set_capacity(capacity);\r
        }\r
+\r
+       void set_priority_class(thread_priority p)\r
+       {\r
+               begin_invoke([=]\r
+               {\r
+                       if(p == high_priority_class)\r
+                               SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);\r
+                       if(p == above_normal_priority_class)\r
+                               SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);\r
+                       else if(p == normal_priority_class)\r
+                               SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);\r
+                       else if(p == below_normal_priority_class)\r
+                               SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);\r
+               });\r
+       }\r
                                \r
        void stop() // noexcept\r
        {\r
@@ -111,40 +150,24 @@ public:
        {\r
                invoke([]{});\r
        }\r
+\r
+       void join()\r
+       {\r
+               if(boost::this_thread::get_id() != thread_.get_id())\r
+                       thread_.join();\r
+       }\r
                                \r
        template<typename Func>\r
-       auto begin_invoke(Func&& func, priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
+       auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
        {       \r
-               typedef boost::packaged_task<decltype(func())> task_type;\r
-                               \r
-               auto task = task_type(std::forward<Func>(func));\r
-               auto future = task.get_future();\r
-               \r
-               if(!is_running_)\r
-                       return std::move(future);       \r
-\r
-               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
-               {\r
-                       try\r
-                       {\r
-                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
-                                       my_task();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               }));\r
-                               \r
                // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
-               struct task_adaptor_t\r
-               {\r
-                       task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
-                       task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
-                       void operator()() const { task(); }\r
-                       mutable task_type task;\r
-               } task_adaptor(std::move(task));\r
+               auto task_adaptor = make_move_on_copy(create_task(func));\r
+\r
+               auto future = task_adaptor.value.get_future();\r
 \r
                execution_queue_[priority].push([=]\r
                {\r
-                       try{task_adaptor();}\r
+                       try{task_adaptor.value();}\r
                        catch(boost::task_already_started&){}\r
                        catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
                });\r
@@ -154,9 +177,30 @@ public:
                                        \r
                return std::move(future);               \r
        }\r
-       \r
+\r
        template<typename Func>\r
-       auto invoke(Func&& func, priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
+       auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
+       {\r
+               // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
+               auto task_adaptor = make_move_on_copy(create_task(func));\r
+               \r
+               auto future = task_adaptor.value.get_future();\r
+\r
+               if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr))\r
+               {                       \r
+                       execution_queue_[priority].try_push([=]\r
+                       {\r
+                               try{task_adaptor.value();}\r
+                               catch(boost::task_already_started&){}\r
+                               catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
+                       });\r
+               }\r
+               \r
+               return std::move(future);                       \r
+       }\r
+\r
+       template<typename Func>\r
+       auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
        {\r
                if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
                        return func();\r
@@ -164,6 +208,15 @@ public:
                return begin_invoke(std::forward<Func>(func), prioriy).get();\r
        }\r
 \r
+       template<typename Func>\r
+       auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
+       {\r
+               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
+                       return func();\r
+               \r
+               return try_begin_invoke(std::forward<Func>(func), prioriy).get();\r
+       }\r
+\r
        void yield() // noexcept\r
        {\r
                if(boost::this_thread::get_id() != thread_.get_id())  // Only yield when calling from execution thread.\r
@@ -200,8 +253,16 @@ private:
                win32_exception::install_handler();             \r
                detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
                while(is_running_)\r
-                       execute();\r
-               is_running_ = false;\r
+               {\r
+                       try\r
+                       {\r
+                               execute();\r
+                       }\r
+                       catch(...)\r
+                       {\r
+                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       }\r
+               }\r
        }       \r
 };\r
 \r