X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fexecutor.h;h=d6949615ac360c8b94671568a46eed750178f3db;hb=dd0fc2703b330461b26e0a2935d72a5fcf3970c0;hp=d812c98e1f806b83164e389fc0eff419ab56daf4;hpb=33e70c3ae0ca930c637f2b961df60f38a1ce187c;p=casparcg diff --git a/common/executor.h b/common/executor.h index d812c98e1..d6949615a 100644 --- a/common/executor.h +++ b/common/executor.h @@ -38,7 +38,6 @@ #include namespace caspar { - enum class task_priority { lowest_priority = 0, @@ -50,47 +49,52 @@ enum class task_priority }; class executor final -{ +{ executor(const executor&); executor& operator=(const executor&); - + typedef blocking_priority_queue, task_priority> function_queue_t; - - const std::wstring name_; - tbb::atomic is_running_; - boost::thread thread_; - function_queue_t execution_queue_; - -public: + + const std::wstring name_; + tbb::atomic is_running_; + boost::thread thread_; + function_queue_t execution_queue_; + tbb::atomic currently_in_task_; + +public: executor(const std::wstring& name) : name_(name) - , execution_queue_(512, std::vector { + , execution_queue_(std::numeric_limits::max(), std::vector { task_priority::lowest_priority, task_priority::lower_priority, task_priority::low_priority, task_priority::normal_priority, task_priority::high_priority, - task_priority::higher_priority + task_priority::higher_priority }) { is_running_ = true; + currently_in_task_ = false; thread_ = boost::thread([this]{run();}); } - + ~executor() { + CASPAR_LOG(debug) << L"Shutting down " << name_; + try { - internal_begin_invoke([=] - { - is_running_ = false; - }).wait(); + if (is_running_) + internal_begin_invoke([=] + { + is_running_ = false; + }).wait(); } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); } - + join(); } @@ -101,19 +105,19 @@ public: template auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future // noexcept - { + { if(!is_running_) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_)); - - return internal_begin_invoke(std::forward(func), priority); + + return internal_begin_invoke(std::forward(func), priority); } - + template auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept { if(is_current()) // Avoids potential deadlock. return func(); - + return begin_invoke(std::forward(func), prioriy).get(); } @@ -142,13 +146,13 @@ public: { return execution_queue_.space_available() == 0; } - + void clear() - { + { std::function func; while(execution_queue_.try_pop(func)); } - + void stop() { invoke([this] @@ -161,38 +165,48 @@ public: { invoke([]{}, task_priority::lowest_priority); } - - function_queue_t::size_type size() const + + function_queue_t::size_type size() const { - return execution_queue_.size(); + return execution_queue_.size(); } - + bool is_running() const { - return is_running_; - } + return is_running_; + } bool is_current() const { return boost::this_thread::get_id() == thread_.get_id(); } - -private: + + bool is_currently_in_task() const + { + return currently_in_task_; + } + + std::wstring name() const + { + return name_; + } + +private: std::wstring print() const { return L"executor[" + name_ + L"]"; } - + template auto internal_begin_invoke( Func&& func, task_priority priority = task_priority::normal_priority) -> std::future // noexcept - { + { typedef typename std::remove_reference::type function_type; typedef decltype(func()) result_type; typedef std::packaged_task task_type; - + std::shared_ptr task; // Use pointers since the boost thread library doesn't fully support move semantics. @@ -211,7 +225,7 @@ private: delete raw_func2; throw; } - + auto future = task->get_future().share(); auto function = [task] { @@ -224,7 +238,10 @@ private: if (!execution_queue_.try_push(priority, function)) { - CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; + if (is_current()) + CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock.")); + + CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller."; execution_queue_.push(priority, function); } @@ -235,27 +252,61 @@ private: function(); } - return future.get(); + try + { + return future.get(); + } + catch (const caspar_exception& e) + { + if (!is_current()) // Add context information from this thread before rethrowing. + { + auto ctx_info = boost::get_error_info(e); + + if (ctx_info) + e << context_info(get_context() + *ctx_info); + else + e << context_info(get_context()); + } + + throw; + } }); } void run() // noexcept { ensure_gpf_handler_installed_for_thread(u8(name_).c_str()); - while(is_running_) + while (is_running_) { try { std::function func; execution_queue_.pop(func); + currently_in_task_ = true; func(); } - catch(...) + catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); } + + currently_in_task_ = false; } - } -}; + // Execute rest + try + { + std::function func; + + while (execution_queue_.try_pop(func)) + { + func(); + } + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } +}; }