#include <future>
namespace caspar {
-
enum class task_priority
{
lowest_priority = 0,
};
class executor final
-{
+{
executor(const executor&);
executor& operator=(const executor&);
-
+
typedef blocking_priority_queue<std::function<void()>, task_priority> function_queue_t;
-
- const std::wstring name_;
- tbb::atomic<bool> is_running_;
- boost::thread thread_;
- function_queue_t execution_queue_;
- tbb::atomic<bool> currently_in_task_;
-
-public:
+
+ const std::wstring name_;
+ tbb::atomic<bool> is_running_;
+ boost::thread thread_;
+ function_queue_t execution_queue_;
+ tbb::atomic<bool> currently_in_task_;
+
+public:
executor(const std::wstring& name)
: name_(name)
- , execution_queue_(512, std::vector<task_priority> {
+ , execution_queue_(std::numeric_limits<int>::max(), std::vector<task_priority> {
task_priority::lowest_priority,
task_priority::lower_priority,
task_priority::low_priority,
task_priority::normal_priority,
task_priority::high_priority,
- task_priority::higher_priority
+ task_priority::higher_priority
})
{
is_running_ = true;
currently_in_task_ = false;
thread_ = boost::thread([this]{run();});
}
-
+
~executor()
{
CASPAR_LOG(debug) << L"Shutting down " << name_;
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
+
join();
}
template<typename Func>
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
- {
+ {
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
-
- return internal_begin_invoke(std::forward<Func>(func), priority);
+
+ return internal_begin_invoke(std::forward<Func>(func), priority);
}
-
+
template<typename Func>
auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
{
if(is_current()) // Avoids potential deadlock.
return func();
-
+
return begin_invoke(std::forward<Func>(func), prioriy).get();
}
{
return execution_queue_.space_available() == 0;
}
-
+
void clear()
- {
+ {
std::function<void ()> func;
while(execution_queue_.try_pop(func));
}
-
+
void stop()
{
invoke([this]
{
invoke([]{}, task_priority::lowest_priority);
}
-
- function_queue_t::size_type size() const
+
+ function_queue_t::size_type size() const
{
- return execution_queue_.size();
+ return execution_queue_.size();
}
bool is_running() const
{
- return is_running_;
- }
+ return is_running_;
+ }
bool is_current() const
{
{
return name_;
}
-
-private:
+
+private:
std::wstring print() const
{
return L"executor[" + name_ + L"]";
}
-
+
template<typename Func>
auto internal_begin_invoke(
Func&& func,
task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
- {
+ {
typedef typename std::remove_reference<Func>::type function_type;
typedef decltype(func()) result_type;
typedef std::packaged_task<result_type()> task_type;
-
+
std::shared_ptr<task_type> task;
// Use pointers since the boost thread library doesn't fully support move semantics.
delete raw_func2;
throw;
}
-
+
auto future = task->get_future().share();
auto function = [task]
{
if (!execution_queue_.try_push(priority, function))
{
+ if (is_current())
+ CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock."));
+
CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
execution_queue_.push(priority, function);
}
catch (const caspar_exception& e)
{
if (!is_current()) // Add context information from this thread before rethrowing.
- e << context_info(get_context() + *boost::get_error_info<context_info_t>(e));
+ {
+ auto ctx_info = boost::get_error_info<context_info_t>(e);
+
+ if (ctx_info)
+ e << context_info(get_context() + *ctx_info);
+ else
+ e << context_info(get_context());
+ }
throw;
}
void run() // noexcept
{
ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
- while(is_running_)
+ while (is_running_)
{
try
{
currently_in_task_ = true;
func();
}
- catch(...)
+ catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
currently_in_task_ = false;
}
- }
-};
+ // Execute rest
+ try
+ {
+ std::function<void()> func;
+
+ while (execution_queue_.try_pop(func))
+ {
+ func();
+ }
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ }
+};
}