#include "enum_class.h"
#include "log.h"
#include "blocking_bounded_queue_adapter.h"
+#include "blocking_priority_queue.h"
+
#include <tbb/atomic.h>
#include <tbb/concurrent_priority_queue.h>
#include <boost/thread.hpp>
#include <boost/optional.hpp>
+#include <boost/assign/list_of.hpp>
#include <functional>
class executor sealed
{
- struct priority_function
- {
- int priority;
- std::function<void()> func;
-
- priority_function()
- {
- }
-
- template<typename F>
- priority_function(int priority, F&& func)
- : priority(priority)
- , func(std::forward<F>(func))
- {
- }
-
- void operator()()
- {
- func();
- }
-
- bool operator<(const priority_function& other) const
- {
- return priority < other.priority;
- }
- };
-
executor(const executor&);
executor& operator=(const executor&);
- typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>> function_queue_t;
+ typedef blocking_priority_queue<std::function<void()>, task_priority> function_queue_t;
const std::wstring name_;
tbb::atomic<bool> is_running_;
public:
executor(const std::wstring& name)
: name_(name)
- , execution_queue_(512)
+ , execution_queue_(512, boost::assign::list_of
+ (task_priority::lowest_priority)
+ (task_priority::lower_priority)
+ (task_priority::low_priority)
+ (task_priority::normal_priority)
+ (task_priority::high_priority)
+ (task_priority::higher_priority))
{
is_running_ = true;
thread_ = boost::thread([this]{run();});
internal_begin_invoke([=]
{
is_running_ = false;
- }, false).wait();
+ }).wait();
}
catch(...)
{
thread_.join();
}
-
- template<typename Func>
- auto try_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())>
- {
- if(!is_running_)
- BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
-
- // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
- return internal_begin_invoke(std::forward<Func>(func), true, priority);
- }
template<typename Func>
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
- return internal_begin_invoke(std::forward<Func>(func), false, priority);
+ return internal_begin_invoke(std::forward<Func>(func), priority);
}
template<typename Func>
return begin_invoke(std::forward<Func>(func), prioriy).get();
}
- void yield()
+ void yield(task_priority minimum_priority)
{
if(!is_current())
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));
- priority_function func;
- if(execution_queue_.try_pop(func))
+ std::function<void ()> func;
+
+ while (execution_queue_.try_pop(func, minimum_priority))
func();
}
void clear()
{
- priority_function func;
+ std::function<void ()> func;
while(execution_queue_.try_pop(func));
}
template<typename Func>
auto internal_begin_invoke(
Func&& func,
- bool try_begin,
task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
{
typedef typename std::remove_reference<Func>::type function_type;
auto future = task->get_future();
auto raw_task = task.release();
- priority_function prio_func(priority.value(), [raw_task]
+ auto function = [raw_task]
{
std::unique_ptr<task_type> task(raw_task);
try
(*task)();
}
catch(boost::task_already_started&){}
- });
+ };
- if (!execution_queue_.try_push(prio_func))
+ if (!execution_queue_.try_push(priority, function))
{
- if (try_begin)
- {
- delete raw_task;
-
- return boost::unique_future<decltype(func())>();
- }
- else
- {
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
- execution_queue_.push(prio_func);
- }
+ CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ execution_queue_.push(priority, function);
}
return std::move(future);
{
try
{
- priority_function func;
+ std::function<void ()> func;
execution_queue_.pop(func);
func();
}