#include "../log.h"\r
\r
#include <tbb/atomic.h>\r
+#include <tbb/concurrent_priority_queue.h>\r
#include <tbb/concurrent_queue.h>\r
\r
#include <boost/thread.hpp>\r
#include <functional>\r
\r
namespace caspar {\r
-\r
-namespace detail {\r
- \r
-template<typename T>\r
-struct move_on_copy\r
-{\r
- move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}\r
- move_on_copy(T&& value) : value(std::move(value)){}\r
- mutable T value;\r
-};\r
-\r
-template<typename T>\r
-move_on_copy<T> make_move_on_copy(T&& value)\r
-{\r
- return move_on_copy<T>(std::move(value));\r
-}\r
-\r
-}\r
- \r
+ \r
struct task_priority_def\r
{\r
enum type\r
{\r
- high_priority,\r
+ lower_priority = 1,\r
+ low_priority,\r
normal_priority,\r
- priority_count\r
+ high_priority,\r
+ higher_priority\r
};\r
};\r
typedef enum_class<task_priority_def> task_priority;\r
\r
class executor\r
-{\r
+{ \r
+ struct priority_function\r
+ {\r
+ int priority;\r
+ std::function<void()> func;\r
+\r
+ priority_function()\r
+ {\r
+ }\r
+\r
+ template<typename F>\r
+ priority_function(int priority, F&& func)\r
+ : priority(priority)\r
+ , func(std::forward<F>(func))\r
+ {\r
+ }\r
+\r
+ void operator()()\r
+ {\r
+ func();\r
+ }\r
+\r
+ bool operator<(const priority_function& other) const\r
+ {\r
+ return priority < other.priority;\r
+ }\r
+ };\r
+\r
executor(const executor&);\r
executor& operator=(const executor&);\r
\r
- tbb::atomic<bool> is_running_;\r
- boost::thread thread_;\r
- \r
- typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
- function_queue execution_queue_[task_priority::priority_count];\r
- \r
- template<typename Func>\r
- auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept\r
- { \r
- typedef boost::packaged_task<decltype(func())> task_type;\r
- \r
- auto task = task_type(std::forward<Func>(func));\r
- \r
- task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
- {\r
- try\r
- {\r
- if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.\r
- my_task();\r
- }\r
- catch(boost::task_already_started&){}\r
- }));\r
- \r
- return std::move(task);\r
- }\r
+ typedef tbb::concurrent_priority_queue<priority_function> function_queue_t;\r
\r
+ tbb::atomic<bool> is_running_;\r
+ boost::thread thread_; \r
+ function_queue_t execution_queue_;\r
+ tbb::concurrent_bounded_queue<int> semaphore_;\r
+ \r
public: \r
executor(const std::wstring& name) // noexcept\r
{\r
stop();\r
thread_.join();\r
}\r
-\r
- void set_capacity(size_t capacity) // noexcept\r
- {\r
- execution_queue_[task_priority::normal_priority].set_capacity(capacity);\r
- }\r
- \r
- void clear()\r
- { \r
- std::function<void()> func;\r
- while(execution_queue_[task_priority::normal_priority].try_pop(func));\r
- while(execution_queue_[task_priority::high_priority].try_pop(func));\r
- }\r
- \r
- void stop() // noexcept\r
- {\r
- invoke([this]{is_running_ = false;});\r
- }\r
-\r
- void wait() // noexcept\r
- {\r
- invoke([]{});\r
- }\r
- \r
+ \r
template<typename Func>\r
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
{ \r
if(!is_running_)\r
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));\r
+ \r
+ typedef typename std::remove_reference<Func>::type function_type;\r
+ typedef decltype(func()) result_type;\r
+ typedef boost::packaged_task<result_type> task_type;\r
+ \r
+ std::unique_ptr<task_type> task;\r
\r
- // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
- auto task_adaptor = detail::make_move_on_copy(create_task(func));\r
-\r
- auto future = task_adaptor.value.get_future();\r
+ // Use pointers since the boost thread library doesn't fully support move semantics.\r
\r
- execution_queue_[priority.value()].push([=]\r
+ auto raw_func2 = new function_type(std::forward<Func>(func));\r
+ try\r
{\r
- try\r
+ task.reset(new task_type([raw_func2]() -> result_type\r
{\r
- task_adaptor.value();\r
- }\r
- catch(boost::task_already_started&)\r
+ std::unique_ptr<function_type> func2(raw_func2);\r
+ return (*func2)();\r
+ }));\r
+ }\r
+ catch(...)\r
+ {\r
+ delete raw_func2;\r
+ throw;\r
+ }\r
+ \r
+ task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+ {\r
+ try\r
{\r
+ if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.\r
+ my_task();\r
}\r
- catch(...)\r
+ catch(boost::task_already_started&){}\r
+ }));\r
+ \r
+ auto future = task->get_future();\r
+\r
+ auto raw_task = task.release();\r
+ priority_function prio_func(priority.value(), [raw_task]\r
+ {\r
+ std::unique_ptr<task_type> task(raw_task);\r
+ try\r
{\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
+ (*task)();\r
}\r
+ catch(boost::task_already_started&){}\r
});\r
\r
- if(priority != task_priority::normal_priority)\r
- execution_queue_[task_priority::normal_priority].push(nullptr);\r
- \r
+ execution_queue_.push(prio_func);\r
+ semaphore_.push(0);\r
+ \r
return std::move(future); \r
}\r
\r
if(boost::this_thread::get_id() != thread_.get_id())\r
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context."));\r
\r
- std::function<void()> func;\r
- execution_queue_[task_priority::normal_priority].pop(func); \r
- \r
- std::function<void()> func2;\r
- while(execution_queue_[task_priority::high_priority].try_pop(func2))\r
- {\r
- if(func2)\r
- func2();\r
- } \r
+ int dummy;\r
+ semaphore_.pop(dummy);\r
\r
- if(func)\r
+ priority_function func;\r
+ if(execution_queue_.try_pop(func))\r
func();\r
}\r
\r
- void yield(task_priority priority) // noexcept\r
+ void set_capacity(std::size_t capacity) // noexcept\r
{\r
- if(boost::this_thread::get_id() != thread_.get_id())\r
- BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context."));\r
+ semaphore_.set_capacity(capacity);\r
+ }\r
\r
- if(priority == task_priority::high_priority)\r
+ std::size_t capacity() const\r
+ {\r
+ return semaphore_.capacity();\r
+ }\r
+ \r
+ void clear()\r
+ { \r
+ priority_function func;\r
+ while(execution_queue_.try_pop(func));\r
+ }\r
+ \r
+ void stop()\r
+ {\r
+ invoke([this]\r
{\r
- std::function<void()> func2;\r
- while(execution_queue_[task_priority::high_priority].try_pop(func2))\r
- {\r
- if(func2)\r
- func2();\r
- } \r
- }\r
- else\r
- yield();\r
+ is_running_ = false;\r
+ });\r
+ }\r
+\r
+ void wait()\r
+ {\r
+ invoke([]{});\r
}\r
\r
- function_queue::size_type size() const /*noexcept*/\r
+ function_queue_t::size_type size() const \r
{\r
- return execution_queue_[task_priority::normal_priority].size() + execution_queue_[task_priority::high_priority].size(); \r
+ return execution_queue_.size(); \r
}\r
\r
- bool is_running() const /*noexcept*/ { return is_running_; } \r
+ bool is_running() const\r
+ {\r
+ return is_running_; \r
+ } \r
\r
private: \r
\r