From: Helge Norberg Date: Thu, 29 Sep 2016 15:55:48 +0000 (+0200) Subject: [executor] changed default to unbounded like in 2.0.7 and fixed a deadlock when capac... X-Git-Tag: 2.1.0_Beta1~51 X-Git-Url: https://git.sesse.net/?p=casparcg;a=commitdiff_plain;h=cd1a44a41dd64c05de067ba728c285f001b66bf3 [executor] changed default to unbounded like in 2.0.7 and fixed a deadlock when capacity is reached. --- diff --git a/common/executor.h b/common/executor.h index e67a0afa7..d8a34d137 100644 --- a/common/executor.h +++ b/common/executor.h @@ -38,7 +38,6 @@ #include namespace caspar { - enum class task_priority { lowest_priority = 0, @@ -50,35 +49,35 @@ enum class task_priority }; class executor final -{ +{ executor(const executor&); executor& operator=(const executor&); - + typedef blocking_priority_queue, task_priority> function_queue_t; - + const std::wstring name_; tbb::atomic is_running_; - boost::thread thread_; + boost::thread thread_; function_queue_t execution_queue_; tbb::atomic currently_in_task_; -public: +public: executor(const std::wstring& name) : name_(name) - , execution_queue_(512, std::vector { + , execution_queue_(std::numeric_limits::max(), std::vector { task_priority::lowest_priority, task_priority::lower_priority, task_priority::low_priority, task_priority::normal_priority, task_priority::high_priority, - task_priority::higher_priority + task_priority::higher_priority }) { is_running_ = true; currently_in_task_ = false; thread_ = boost::thread([this]{run();}); } - + ~executor() { CASPAR_LOG(debug) << L"Shutting down " << name_; @@ -95,7 +94,7 @@ public: { CASPAR_LOG_CURRENT_EXCEPTION(); } - + join(); } @@ -106,19 +105,19 @@ public: template auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future // noexcept - { + { if(!is_running_) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_)); - - return internal_begin_invoke(std::forward(func), priority); + + return internal_begin_invoke(std::forward(func), priority); } - + template auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept { if(is_current()) // Avoids potential deadlock. return func(); - + return begin_invoke(std::forward(func), prioriy).get(); } @@ -147,13 +146,13 @@ public: { return execution_queue_.space_available() == 0; } - + void clear() - { + { std::function func; while(execution_queue_.try_pop(func)); } - + void stop() { invoke([this] @@ -166,16 +165,16 @@ public: { invoke([]{}, task_priority::lowest_priority); } - - function_queue_t::size_type size() const + + function_queue_t::size_type size() const { - return execution_queue_.size(); + return execution_queue_.size(); } bool is_running() const { - return is_running_; - } + return is_running_; + } bool is_current() const { @@ -191,23 +190,23 @@ public: { return name_; } - -private: + +private: std::wstring print() const { return L"executor[" + name_ + L"]"; } - + template auto internal_begin_invoke( Func&& func, task_priority priority = task_priority::normal_priority) -> std::future // noexcept - { + { typedef typename std::remove_reference::type function_type; typedef decltype(func()) result_type; typedef std::packaged_task task_type; - + std::shared_ptr task; // Use pointers since the boost thread library doesn't fully support move semantics. @@ -226,7 +225,7 @@ private: delete raw_func2; throw; } - + auto future = task->get_future().share(); auto function = [task] { @@ -239,6 +238,9 @@ private: if (!execution_queue_.try_push(priority, function)) { + if (is_current()) + CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock.")); + CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller."; execution_queue_.push(priority, function); } @@ -290,7 +292,6 @@ private: currently_in_task_ = false; } - } + } }; - }