#pragma once
+#include "os/general_protection_fault.h"
#include "except.h"
-#include "enum_class.h"
#include "log.h"
#include "blocking_bounded_queue_adapter.h"
#include "blocking_priority_queue.h"
#include <boost/thread.hpp>
#include <boost/optional.hpp>
-#include <boost/assign/list_of.hpp>
#include <functional>
#include <future>
namespace caspar {
-struct task_priority_def
+enum class task_priority
{
- enum type
- {
- lowest_priority = 0,
- lower_priority,
- low_priority,
- normal_priority,
- high_priority,
- higher_priority
- };
+ lowest_priority = 0,
+ lower_priority,
+ low_priority,
+ normal_priority,
+ high_priority,
+ higher_priority
};
-typedef enum_class<task_priority_def> task_priority;
-class executor /* final */
+class executor final
{
executor(const executor&);
executor& operator=(const executor&);
tbb::atomic<bool> is_running_;
boost::thread thread_;
function_queue_t execution_queue_;
-
+ tbb::atomic<bool> currently_in_task_;
+
public:
executor(const std::wstring& name)
: name_(name)
- , execution_queue_(512, boost::assign::list_of
- (task_priority::lowest_priority)
- (task_priority::lower_priority)
- (task_priority::low_priority)
- (task_priority::normal_priority)
- (task_priority::high_priority)
- (task_priority::higher_priority))
+ , execution_queue_(512, std::vector<task_priority> {
+ task_priority::lowest_priority,
+ task_priority::lower_priority,
+ task_priority::low_priority,
+ task_priority::normal_priority,
+ task_priority::high_priority,
+ task_priority::higher_priority
+ })
{
is_running_ = true;
+ currently_in_task_ = false;
thread_ = boost::thread([this]{run();});
}
~executor()
{
+ CASPAR_LOG(debug) << L"Shutting down " << name_;
+
try
{
- internal_begin_invoke([=]
- {
- is_running_ = false;
- }).wait();
+ if (is_running_)
+ internal_begin_invoke([=]
+ {
+ is_running_ = false;
+ }).wait();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+ join();
+ }
+
+ void join()
+ {
thread_.join();
}
{
return execution_queue_.size();
}
-
+
bool is_running() const
{
return is_running_;
{
return boost::this_thread::get_id() == thread_.get_id();
}
+
+ bool is_currently_in_task() const
+ {
+ return currently_in_task_;
+ }
+
+ std::wstring name() const
+ {
+ return name_;
+ }
private:
if (!execution_queue_.try_push(priority, function))
{
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
execution_queue_.push(priority, function);
}
function();
}
- return future.get();
+ try
+ {
+ return future.get();
+ }
+ catch (const caspar_exception& e)
+ {
+ if (!is_current()) // Add context information from this thread before rethrowing.
+ e << context_info(get_context() + *boost::get_error_info<context_info_t>(e));
+
+ throw;
+ }
});
}
void run() // noexcept
{
- win32_exception::install_handler();
+ ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
while(is_running_)
{
try
{
std::function<void ()> func;
execution_queue_.pop(func);
+ currently_in_task_ = true;
func();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+
+ currently_in_task_ = false;
}
}
};
-}
\ No newline at end of file
+}