#pragma once
+#include "os/general_protection_fault.h"
#include "except.h"
-#include "enum_class.h"
#include "log.h"
#include "blocking_bounded_queue_adapter.h"
#include "blocking_priority_queue.h"
-
+#include "future.h"
#include <tbb/atomic.h>
#include <tbb/concurrent_priority_queue.h>
#include <boost/thread.hpp>
#include <boost/optional.hpp>
-#include <boost/assign/list_of.hpp>
#include <functional>
+#include <future>
namespace caspar {
-struct task_priority_def
+enum class task_priority
{
- enum type
- {
- lowest_priority = 0,
- lower_priority,
- low_priority,
- normal_priority,
- high_priority,
- higher_priority
- };
+ lowest_priority = 0,
+ lower_priority,
+ low_priority,
+ normal_priority,
+ high_priority,
+ higher_priority
};
-typedef enum_class<task_priority_def> task_priority;
-class executor sealed
+class executor final
{
executor(const executor&);
executor& operator=(const executor&);
tbb::atomic<bool> is_running_;
boost::thread thread_;
function_queue_t execution_queue_;
-
+ tbb::atomic<bool> currently_in_task_;
+
public:
executor(const std::wstring& name)
: name_(name)
- , execution_queue_(512, boost::assign::list_of
- (task_priority::lowest_priority)
- (task_priority::lower_priority)
- (task_priority::low_priority)
- (task_priority::normal_priority)
- (task_priority::high_priority)
- (task_priority::higher_priority))
+ , execution_queue_(512, std::vector<task_priority> {
+ task_priority::lowest_priority,
+ task_priority::lower_priority,
+ task_priority::low_priority,
+ task_priority::normal_priority,
+ task_priority::high_priority,
+ task_priority::higher_priority
+ })
{
is_running_ = true;
+ currently_in_task_ = false;
thread_ = boost::thread([this]{run();});
}
~executor()
{
+ CASPAR_LOG(debug) << L"Shutting down " << name_;
+
try
{
- internal_begin_invoke([=]
- {
- is_running_ = false;
- }).wait();
+ if (is_running_)
+ internal_begin_invoke([=]
+ {
+ is_running_ = false;
+ }).wait();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+ join();
+ }
+
+ void join()
+ {
thread_.join();
}
template<typename Func>
- auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+ auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
{
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
{
return execution_queue_.capacity();
}
+
+ bool is_full() const
+ {
+ return execution_queue_.space_available() == 0;
+ }
void clear()
{
{
return execution_queue_.size();
}
-
+
bool is_running() const
{
return is_running_;
{
return boost::this_thread::get_id() == thread_.get_id();
}
+
+ bool is_currently_in_task() const
+ {
+ return currently_in_task_;
+ }
+
+ std::wstring name() const
+ {
+ return name_;
+ }
private:
template<typename Func>
auto internal_begin_invoke(
Func&& func,
- task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+ task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
{
typedef typename std::remove_reference<Func>::type function_type;
typedef decltype(func()) result_type;
- typedef boost::packaged_task<result_type> task_type;
+ typedef std::packaged_task<result_type()> task_type;
- std::unique_ptr<task_type> task;
+ std::shared_ptr<task_type> task;
// Use pointers since the boost thread library doesn't fully support move semantics.
delete raw_func2;
throw;
}
-
- task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
- {
- try
- {
- if(is_current()) // Avoids potential deadlock.
- my_task();
- }
- catch(boost::task_already_started&){}
- }));
- auto future = task->get_future();
-
- auto raw_task = task.release();
- auto function = [raw_task]
+ auto future = task->get_future().share();
+ auto function = [task]
{
- std::unique_ptr<task_type> task(raw_task);
try
{
(*task)();
}
- catch(boost::task_already_started&){}
+ catch(std::future_error&){}
};
if (!execution_queue_.try_push(priority, function))
{
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
execution_queue_.push(priority, function);
}
- return std::move(future);
+ return std::async(std::launch::deferred, [=]() mutable -> result_type
+ {
+ if (!is_ready(future) && is_current()) // Avoids potential deadlock.
+ {
+ function();
+ }
+
+ try
+ {
+ return future.get();
+ }
+ catch (const caspar_exception& e)
+ {
+ if (!is_current()) // Add context information from this thread before rethrowing.
+ e << context_info(get_context() + *boost::get_error_info<context_info_t>(e));
+
+ throw;
+ }
+ });
}
void run() // noexcept
{
- win32_exception::install_handler();
+ ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
while(is_running_)
{
try
{
std::function<void ()> func;
execution_queue_.pop(func);
+ currently_in_task_ = true;
func();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+
+ currently_in_task_ = false;
}
}
};
-}
\ No newline at end of file
+}