\r
const std::wstring name_;\r
tbb::atomic<bool> is_running_;\r
- boost::thread thread_; \r
function_queue_t execution_queue_;\r
tbb::concurrent_bounded_queue<int> semaphore_;\r
+ boost::thread thread_; \r
\r
public: \r
executor(const std::wstring& name)\r
\r
virtual ~executor()\r
{\r
- stop();\r
+ try\r
+ {\r
+ internal_begin_invoke([this]\r
+ {\r
+ is_running_ = false;\r
+ }).wait();\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ is_running_ = false;\r
+ }\r
thread_.join();\r
}\r
- \r
+ \r
template<typename Func>\r
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
{ \r
if(execution_queue_.size() > 128)\r
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor overflow.") << source_info(name_));\r
-\r
+ \r
if(!is_running_)\r
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));\r
- \r
- typedef typename std::remove_reference<Func>::type function_type;\r
- typedef decltype(func()) result_type;\r
- typedef boost::packaged_task<result_type> task_type;\r
- \r
- std::unique_ptr<task_type> task;\r
-\r
- // Use pointers since the boost thread library doesn't fully support move semantics.\r
-\r
- auto raw_func2 = new function_type(std::forward<Func>(func));\r
- try\r
- {\r
- task.reset(new task_type([raw_func2]() -> result_type\r
- {\r
- std::unique_ptr<function_type> func2(raw_func2);\r
- return (*func2)();\r
- }));\r
- }\r
- catch(...)\r
- {\r
- delete raw_func2;\r
- throw;\r
- }\r
- \r
- task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
- {\r
- try\r
- {\r
- if(is_current()) // Avoids potential deadlock.\r
- my_task();\r
- }\r
- catch(boost::task_already_started&){}\r
- }));\r
- \r
- auto future = task->get_future();\r
-\r
- auto raw_task = task.release();\r
- priority_function prio_func(priority.value(), [raw_task]\r
- {\r
- std::unique_ptr<task_type> task(raw_task);\r
- try\r
- {\r
- (*task)();\r
- }\r
- catch(boost::task_already_started&){}\r
- });\r
\r
- execution_queue_.push(prio_func);\r
- semaphore_.push(0);\r
- \r
- return std::move(future); \r
+ return internal_begin_invoke(std::forward<Func>(func), priority);\r
}\r
- \r
+\r
template<typename Func>\r
auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept\r
{\r
}\r
\r
private: \r
+ \r
+ template<typename Func>\r
+ auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
+ { \r
+ typedef typename std::remove_reference<Func>::type function_type;\r
+ typedef decltype(func()) result_type;\r
+ typedef boost::packaged_task<result_type> task_type;\r
+ \r
+ std::unique_ptr<task_type> task;\r
+\r
+ // Use pointers since the boost thread library doesn't fully support move semantics.\r
+\r
+ auto raw_func2 = new function_type(std::forward<Func>(func));\r
+ try\r
+ {\r
+ task.reset(new task_type([raw_func2]() -> result_type\r
+ {\r
+ std::unique_ptr<function_type> func2(raw_func2);\r
+ return (*func2)();\r
+ }));\r
+ }\r
+ catch(...)\r
+ {\r
+ delete raw_func2;\r
+ throw;\r
+ }\r
+ \r
+ task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+ {\r
+ try\r
+ {\r
+ if(is_current()) // Avoids potential deadlock.\r
+ my_task();\r
+ }\r
+ catch(boost::task_already_started&){}\r
+ }));\r
+ \r
+ auto future = task->get_future();\r
+\r
+ auto raw_task = task.release();\r
+ priority_function prio_func(priority.value(), [raw_task]\r
+ {\r
+ std::unique_ptr<task_type> task(raw_task);\r
+ try\r
+ {\r
+ (*task)();\r
+ }\r
+ catch(boost::task_already_started&){}\r
+ });\r
+\r
+ execution_queue_.push(prio_func);\r
+ semaphore_.push(0);\r
+ \r
+ return std::move(future); \r
+ }\r
\r
void run() // noexcept\r
{\r