X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fexecutor.h;h=d6949615ac360c8b94671568a46eed750178f3db;hb=db6ae747c77703ab44d630baa190a9aba7e08ca3;hp=7f781cc64540688e95dcd63554443d5cd8174278;hpb=93d6caf50ee7c3d54f3c94c2485a1983bc36bb89;p=casparcg diff --git a/common/executor.h b/common/executor.h index 7f781cc64..d6949615a 100644 --- a/common/executor.h +++ b/common/executor.h @@ -21,97 +21,103 @@ #pragma once +#include "os/general_protection_fault.h" #include "except.h" -#include "enum_class.h" #include "log.h" #include "blocking_bounded_queue_adapter.h" #include "blocking_priority_queue.h" - +#include "future.h" #include #include #include #include -#include #include +#include namespace caspar { - -struct task_priority_def +enum class task_priority { - enum type - { - lowest_priority = 0, - lower_priority, - low_priority, - normal_priority, - high_priority, - higher_priority - }; + lowest_priority = 0, + lower_priority, + low_priority, + normal_priority, + high_priority, + higher_priority }; -typedef enum_class task_priority; -class executor sealed -{ +class executor final +{ executor(const executor&); executor& operator=(const executor&); - + typedef blocking_priority_queue, task_priority> function_queue_t; - - const std::wstring name_; - tbb::atomic is_running_; - boost::thread thread_; - function_queue_t execution_queue_; - -public: + + const std::wstring name_; + tbb::atomic is_running_; + boost::thread thread_; + function_queue_t execution_queue_; + tbb::atomic currently_in_task_; + +public: executor(const std::wstring& name) : name_(name) - , execution_queue_(512, boost::assign::list_of - (task_priority::lowest_priority) - (task_priority::lower_priority) - (task_priority::low_priority) - (task_priority::normal_priority) - (task_priority::high_priority) - (task_priority::higher_priority)) + , execution_queue_(std::numeric_limits::max(), std::vector { + task_priority::lowest_priority, + task_priority::lower_priority, + task_priority::low_priority, + task_priority::normal_priority, + task_priority::high_priority, + task_priority::higher_priority + }) { is_running_ = true; + currently_in_task_ = false; thread_ = boost::thread([this]{run();}); } - + ~executor() { + CASPAR_LOG(debug) << L"Shutting down " << name_; + try { - internal_begin_invoke([=] - { - is_running_ = false; - }).wait(); + if (is_running_) + internal_begin_invoke([=] + { + is_running_ = false; + }).wait(); } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); } - + + join(); + } + + void join() + { thread_.join(); } template - auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept - { + auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future // noexcept + { if(!is_running_) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_)); - - return internal_begin_invoke(std::forward(func), priority); + + return internal_begin_invoke(std::forward(func), priority); } - + template auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept { if(is_current()) // Avoids potential deadlock. return func(); - + return begin_invoke(std::forward(func), prioriy).get(); } @@ -140,13 +146,13 @@ public: { return execution_queue_.space_available() == 0; } - + void clear() - { + { std::function func; while(execution_queue_.try_pop(func)); } - + void stop() { invoke([this] @@ -159,39 +165,49 @@ public: { invoke([]{}, task_priority::lowest_priority); } - - function_queue_t::size_type size() const + + function_queue_t::size_type size() const { - return execution_queue_.size(); + return execution_queue_.size(); } - + bool is_running() const { - return is_running_; - } + return is_running_; + } bool is_current() const { return boost::this_thread::get_id() == thread_.get_id(); } - -private: + + bool is_currently_in_task() const + { + return currently_in_task_; + } + + std::wstring name() const + { + return name_; + } + +private: std::wstring print() const { return L"executor[" + name_ + L"]"; } - + template auto internal_begin_invoke( Func&& func, - task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept - { + task_priority priority = task_priority::normal_priority) -> std::future // noexcept + { typedef typename std::remove_reference::type function_type; typedef decltype(func()) result_type; - typedef boost::packaged_task task_type; - - std::unique_ptr task; + typedef std::packaged_task task_type; + + std::shared_ptr task; // Use pointers since the boost thread library doesn't fully support move semantics. @@ -209,56 +225,88 @@ private: delete raw_func2; throw; } - - task->set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. - { - try - { - if(is_current()) // Avoids potential deadlock. - my_task(); - } - catch(boost::task_already_started&){} - })); - - auto future = task->get_future(); - auto raw_task = task.release(); - auto function = [raw_task] + auto future = task->get_future().share(); + auto function = [task] { - std::unique_ptr task(raw_task); try { (*task)(); } - catch(boost::task_already_started&){} + catch(std::future_error&){} }; if (!execution_queue_.try_push(priority, function)) { - CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; + if (is_current()) + CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock.")); + + CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller."; execution_queue_.push(priority, function); } - return std::move(future); + return std::async(std::launch::deferred, [=]() mutable -> result_type + { + if (!is_ready(future) && is_current()) // Avoids potential deadlock. + { + function(); + } + + try + { + return future.get(); + } + catch (const caspar_exception& e) + { + if (!is_current()) // Add context information from this thread before rethrowing. + { + auto ctx_info = boost::get_error_info(e); + + if (ctx_info) + e << context_info(get_context() + *ctx_info); + else + e << context_info(get_context()); + } + + throw; + } + }); } void run() // noexcept { - win32_exception::install_handler(); - while(is_running_) + ensure_gpf_handler_installed_for_thread(u8(name_).c_str()); + while (is_running_) { try { std::function func; execution_queue_.pop(func); + currently_in_task_ = true; func(); } - catch(...) + catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); } + + currently_in_task_ = false; } - } -}; -} \ No newline at end of file + // Execute rest + try + { + std::function func; + + while (execution_queue_.try_pop(func)) + { + func(); + } + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } +}; +}