X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fconcurrency%2Fexecutor.h;h=0687889a179a951b0d2e14d82a9162941a4d03dd;hb=a486c25d5e6ce0ebe08e9a2d793a447ff3cb797a;hp=e2d4d0243bf0c5251f5844525f5d52876a3bb869;hpb=f66aba8de59e39e169e633deccd691565ac9e166;p=casparcg diff --git a/common/concurrency/executor.h b/common/concurrency/executor.h index e2d4d0243..0687889a1 100644 --- a/common/concurrency/executor.h +++ b/common/concurrency/executor.h @@ -20,7 +20,8 @@ #pragma once #include "../exception/win32_exception.h" -#include "../utility/assert.h" +#include "../utility/string.h" +#include "../utility/move_on_copy.h" #include "../log/log.h" #include @@ -61,38 +62,21 @@ inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) } -enum priority +enum task_priority { high_priority, normal_priority, priority_count }; -enum priority_class +enum thread_priority { high_priority_class, above_normal_priority_class, normal_priority_class, - below_normal_priority_clas + below_normal_priority_class }; -namespace internal -{ - template - struct move_on_copy - { - move_on_copy(const move_on_copy& other) : value(std::move(other.value)){} - move_on_copy(T&& value) : value(std::move(value)){} - mutable T value; - }; - - template - move_on_copy make_move_on_copy(T&& value) - { - return move_on_copy(std::move(value)); - } -} - class executor : boost::noncopyable { const std::string name_; @@ -133,6 +117,7 @@ public: virtual ~executor() // noexcept { stop(); + execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. join(); } @@ -141,7 +126,7 @@ public: execution_queue_[normal_priority].set_capacity(capacity); } - void set_priority_class(priority_class p) + void set_priority_class(thread_priority p) { begin_invoke([=] { @@ -151,7 +136,7 @@ public: SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS); else if(p == normal_priority_class) SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS); - else if(p == below_normal_priority_clas) + else if(p == below_normal_priority_class) SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS); }); } @@ -159,7 +144,6 @@ public: void stop() // noexcept { is_running_ = false; - execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. } void wait() // noexcept @@ -174,10 +158,10 @@ public: } template - auto begin_invoke(Func&& func, priority priority = normal_priority) -> boost::unique_future // noexcept + auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept { // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. - auto task_adaptor = internal::make_move_on_copy(create_task(func)); + auto task_adaptor = make_move_on_copy(create_task(func)); auto future = task_adaptor.value.get_future(); @@ -195,10 +179,10 @@ public: } template - auto try_begin_invoke(Func&& func, priority priority = normal_priority) -> boost::unique_future // noexcept + auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept { // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. - auto task_adaptor = internal::make_move_on_copy(create_task(func)); + auto task_adaptor = make_move_on_copy(create_task(func)); auto future = task_adaptor.value.get_future(); @@ -216,7 +200,7 @@ public: } template - auto invoke(Func&& func, priority prioriy = normal_priority) -> decltype(func()) // noexcept + auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. return func(); @@ -225,7 +209,7 @@ public: } template - auto try_invoke(Func&& func, priority prioriy = normal_priority) -> decltype(func()) // noexcept + auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. return func(); @@ -269,7 +253,16 @@ private: win32_exception::install_handler(); detail::SetThreadName(GetCurrentThreadId(), name_.c_str()); while(is_running_) - execute(); + { + try + { + execute(); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } } };