tbb::atomic<bool> is_running_;
boost::thread thread_;
function_queue_t execution_queue_;
-
+ tbb::atomic<bool> currently_in_task_;
+
public:
executor(const std::wstring& name)
: name_(name)
})
{
is_running_ = true;
+ currently_in_task_ = false;
thread_ = boost::thread([this]{run();});
}
~executor()
{
+ CASPAR_LOG(debug) << L"Shutting down " << name_;
+
try
{
- internal_begin_invoke([=]
- {
- is_running_ = false;
- }).wait();
+ if (is_running_)
+ internal_begin_invoke([=]
+ {
+ is_running_ = false;
+ }).wait();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+ join();
+ }
+
+ void join()
+ {
thread_.join();
}
{
return execution_queue_.size();
}
-
+
bool is_running() const
{
return is_running_;
{
return boost::this_thread::get_id() == thread_.get_id();
}
+
+ bool is_currently_in_task() const
+ {
+ return currently_in_task_;
+ }
+
+ std::wstring name() const
+ {
+ return name_;
+ }
private:
if (!execution_queue_.try_push(priority, function))
{
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
execution_queue_.push(priority, function);
}
function();
}
- return future.get();
+ try
+ {
+ return future.get();
+ }
+ catch (const caspar_exception& e)
+ {
+ if (!is_current()) // Add context information from this thread before rethrowing.
+ {
+ auto ctx_info = boost::get_error_info<context_info_t>(e);
+
+ if (ctx_info)
+ e << context_info(get_context() + *ctx_info);
+ else
+ e << context_info(get_context());
+ }
+
+ throw;
+ }
});
}
{
std::function<void ()> func;
execution_queue_.pop(func);
+ currently_in_task_ = true;
func();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+
+ currently_in_task_ = false;
}
}
};