#include <future>
namespace caspar {
-
enum class task_priority
{
lowest_priority = 0,
};
class executor final
-{
+{
executor(const executor&);
executor& operator=(const executor&);
-
+
typedef blocking_priority_queue<std::function<void()>, task_priority> function_queue_t;
-
+
const std::wstring name_;
tbb::atomic<bool> is_running_;
- boost::thread thread_;
+ boost::thread thread_;
function_queue_t execution_queue_;
-
-public:
+ tbb::atomic<bool> currently_in_task_;
+
+public:
executor(const std::wstring& name)
: name_(name)
- , execution_queue_(512, std::vector<task_priority> {
+ , execution_queue_(std::numeric_limits<int>::max(), std::vector<task_priority> {
task_priority::lowest_priority,
task_priority::lower_priority,
task_priority::low_priority,
task_priority::normal_priority,
task_priority::high_priority,
- task_priority::higher_priority
+ task_priority::higher_priority
})
{
is_running_ = true;
+ currently_in_task_ = false;
thread_ = boost::thread([this]{run();});
}
-
+
~executor()
{
+ CASPAR_LOG(debug) << L"Shutting down " << name_;
+
try
{
- internal_begin_invoke([=]
- {
- is_running_ = false;
- }).wait();
+ if (is_running_)
+ internal_begin_invoke([=]
+ {
+ is_running_ = false;
+ }).wait();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
+
join();
}
template<typename Func>
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
- {
+ {
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
-
- return internal_begin_invoke(std::forward<Func>(func), priority);
+
+ return internal_begin_invoke(std::forward<Func>(func), priority);
}
-
+
template<typename Func>
auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
{
if(is_current()) // Avoids potential deadlock.
return func();
-
+
return begin_invoke(std::forward<Func>(func), prioriy).get();
}
{
return execution_queue_.space_available() == 0;
}
-
+
void clear()
- {
+ {
std::function<void ()> func;
while(execution_queue_.try_pop(func));
}
-
+
void stop()
{
invoke([this]
{
invoke([]{}, task_priority::lowest_priority);
}
-
- function_queue_t::size_type size() const
+
+ function_queue_t::size_type size() const
{
- return execution_queue_.size();
+ return execution_queue_.size();
}
-
+
bool is_running() const
{
- return is_running_;
- }
+ return is_running_;
+ }
bool is_current() const
{
return boost::this_thread::get_id() == thread_.get_id();
}
+ bool is_currently_in_task() const
+ {
+ return currently_in_task_;
+ }
+
std::wstring name() const
{
return name_;
}
-
-private:
+
+private:
std::wstring print() const
{
return L"executor[" + name_ + L"]";
}
-
+
template<typename Func>
auto internal_begin_invoke(
Func&& func,
task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
- {
+ {
typedef typename std::remove_reference<Func>::type function_type;
typedef decltype(func()) result_type;
typedef std::packaged_task<result_type()> task_type;
-
+
std::shared_ptr<task_type> task;
// Use pointers since the boost thread library doesn't fully support move semantics.
delete raw_func2;
throw;
}
-
+
auto future = task->get_future().share();
auto function = [task]
{
if (!execution_queue_.try_push(priority, function))
{
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ if (is_current())
+ CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock."));
+
+ CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
execution_queue_.push(priority, function);
}
function();
}
- return future.get();
+ try
+ {
+ return future.get();
+ }
+ catch (const caspar_exception& e)
+ {
+ if (!is_current()) // Add context information from this thread before rethrowing.
+ {
+ auto ctx_info = boost::get_error_info<context_info_t>(e);
+
+ if (ctx_info)
+ e << context_info(get_context() + *ctx_info);
+ else
+ e << context_info(get_context());
+ }
+
+ throw;
+ }
});
}
{
std::function<void ()> func;
execution_queue_.pop(func);
+ currently_in_task_ = true;
func();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+
+ currently_in_task_ = false;
}
- }
+ }
};
-
}