X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fexecutor.h;h=805c1bc9dbe6d080fbd7a418d0cd29dd34fb65a6;hb=e54e9fb019a185d9516b99d33d4c8aa8e44ccd7e;hp=c2876caadfcaca43ffba9b0e1b745a8a6bebab88;hpb=3d90acb484ffd68d2a5cb7ea6b800d1fbba3eb2b;p=casparcg diff --git a/common/executor.h b/common/executor.h index c2876caad..805c1bc9d 100644 --- a/common/executor.h +++ b/common/executor.h @@ -21,8 +21,8 @@ #pragma once +#include "os/general_protection_fault.h" #include "except.h" -#include "enum_class.h" #include "log.h" #include "blocking_bounded_queue_adapter.h" #include "blocking_priority_queue.h" @@ -39,19 +39,15 @@ namespace caspar { -struct task_priority_def +enum class task_priority { - enum type - { - lowest_priority = 0, - lower_priority, - low_priority, - normal_priority, - high_priority, - higher_priority - }; + lowest_priority = 0, + lower_priority, + low_priority, + normal_priority, + high_priority, + higher_priority }; -typedef enum_class task_priority; class executor final { @@ -64,7 +60,8 @@ class executor final tbb::atomic is_running_; boost::thread thread_; function_queue_t execution_queue_; - + tbb::atomic currently_in_task_; + public: executor(const std::wstring& name) : name_(name) @@ -78,23 +75,32 @@ public: }) { is_running_ = true; + currently_in_task_ = false; thread_ = boost::thread([this]{run();}); } ~executor() { + CASPAR_LOG(debug) << L"Shutting down " << name_; + try { - internal_begin_invoke([=] - { - is_running_ = false; - }).wait(); + if (is_running_) + internal_begin_invoke([=] + { + is_running_ = false; + }).wait(); } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); } + join(); + } + + void join() + { thread_.join(); } @@ -165,7 +171,7 @@ public: { return execution_queue_.size(); } - + bool is_running() const { return is_running_; @@ -175,6 +181,16 @@ public: { return boost::this_thread::get_id() == thread_.get_id(); } + + bool is_currently_in_task() const + { + return currently_in_task_; + } + + std::wstring name() const + { + return name_; + } private: @@ -223,7 +239,7 @@ private: if (!execution_queue_.try_push(priority, function)) { - CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; + CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller."; execution_queue_.push(priority, function); } @@ -234,27 +250,40 @@ private: function(); } - return future.get(); + try + { + return future.get(); + } + catch (const caspar_exception& e) + { + if (!is_current()) // Add context information from this thread before rethrowing. + e << context_info(get_context() + *boost::get_error_info(e)); + + throw; + } }); } void run() // noexcept { - win32_exception::install_handler(); + ensure_gpf_handler_installed_for_thread(u8(name_).c_str()); while(is_running_) { try { std::function func; execution_queue_.pop(func); + currently_in_task_ = true; func(); } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); } + + currently_in_task_ = false; } } }; -} \ No newline at end of file +}