#include "except.h"
#include "enum_class.h"
#include "log.h"
+#include "blocking_bounded_queue_adapter.h"
#include <tbb/atomic.h>
#include <tbb/concurrent_priority_queue.h>
-#include <tbb/concurrent_queue.h>
#include <boost/thread.hpp>
+#include <boost/optional.hpp>
#include <functional>
executor(const executor&);
executor& operator=(const executor&);
- typedef tbb::concurrent_priority_queue<priority_function> function_queue_t;
+ typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>> function_queue_t;
const std::wstring name_;
tbb::atomic<bool> is_running_;
boost::thread thread_;
function_queue_t execution_queue_;
- tbb::concurrent_bounded_queue<int> semaphore_;
public:
executor(const std::wstring& name)
: name_(name)
+ , execution_queue_(512)
{
is_running_ = true;
- set_capacity(512);
thread_ = boost::thread([this]{run();});
}
internal_begin_invoke([=]
{
is_running_ = false;
- }).wait();
+ }, false).wait();
}
catch(...)
{
thread_.join();
}
-
+
+ template<typename Func>
+ auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())>
+ {
+ if(!is_running_)
+ BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+ // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
+ return internal_begin_invoke(std::forward<Func>(func), true, priority);
+ }
+
template<typename Func>
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
{
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
- return internal_begin_invoke(std::forward<Func>(func), priority);
+ return internal_begin_invoke(std::forward<Func>(func), false, priority);
}
template<typename Func>
if(!is_current())
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));
- int dummy;
- if(!semaphore_.try_pop(dummy))
- return;
-
priority_function func;
if(execution_queue_.try_pop(func))
func();
}
- void set_capacity(std::size_t capacity)
+ void set_capacity(function_queue_t::size_type capacity)
{
- semaphore_.set_capacity(capacity);
+ execution_queue_.set_capacity(capacity);
}
- std::size_t capacity() const
+ function_queue_t::size_type capacity() const
{
- return semaphore_.capacity();
+ return execution_queue_.capacity();
}
void clear()
}
template<typename Func>
- auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+ auto internal_begin_invoke(
+ Func&& func,
+ bool try_begin,
+ task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
{
typedef typename std::remove_reference<Func>::type function_type;
typedef decltype(func()) result_type;
catch(boost::task_already_started&){}
});
- execution_queue_.push(prio_func);
-
- if(!semaphore_.try_push(0))
+ if (!execution_queue_.try_push(prio_func))
{
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
- semaphore_.push(0);
- }
+ if (try_begin)
+ {
+ delete raw_task;
+
+ return boost::unique_future<decltype(func())>();
+ }
+ else
+ {
+ CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ execution_queue_.push(prio_func);
+ }
+ }
+
return std::move(future);
}
{
try
{
- int dummy;
- semaphore_.pop(dummy);
-
priority_function func;
- if(execution_queue_.try_pop(func))
- func();
+ execution_queue_.pop(func);
+ func();
}
catch(...)
{