3 #include "../exception/win32_exception.h"
\r
4 #include "../log/log.h"
\r
6 #include <tbb/atomic.h>
\r
7 #include <tbb/concurrent_queue.h>
\r
9 #include <boost/thread.hpp>
\r
10 #include <boost/noncopyable.hpp>
\r
12 #include <functional>
\r
17 class executor : boost::noncopyable
\r
27 explicit executor(const std::function<void()>& f = nullptr)
\r
29 is_running_ = false;
\r
30 f_ = f != nullptr ? f : [this]{run();};
\r
38 void start() // noexcept
\r
40 if(is_running_.fetch_and_store(true))
\r
42 thread_ = boost::thread(f_);
\r
45 void stop(stop_policy policy = wait) // noexcept
\r
47 is_running_ = false;
\r
48 execution_queue_.push([]{});
\r
49 if(policy == wait && boost::this_thread::get_id() != thread_.get_id())
\r
55 std::function<void()> func;
\r
56 while(execution_queue_.try_pop(func)){}
\r
59 template<typename Func>
\r
60 auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept
\r
62 typedef boost::packaged_task<decltype(func())> task_type;
\r
64 auto task = task_type(std::forward<Func>(func));
\r
65 auto future = task.get_future();
\r
67 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
\r
71 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
74 catch(boost::task_already_started&){}
\r
77 struct task_adaptor_t
\r
79 task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}
\r
80 task_adaptor_t(task_type&& task) : task(std::move(task)){}
\r
81 void operator()() const { task(); }
\r
82 mutable task_type task;
\r
83 } task_adaptor(std::move(task));
\r
85 execution_queue_.try_push([=]
\r
87 try{task_adaptor();}
\r
88 catch(boost::task_already_started&){}
\r
89 catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}
\r
92 return std::move(future);
\r
95 template<typename Func>
\r
96 auto invoke(Func&& func) -> decltype(func())
\r
98 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
101 return begin_invoke(std::forward<Func>(func)).get();
\r
104 tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size(); }
\r
105 bool empty() const { return execution_queue_.empty(); }
\r
106 bool is_running() const { return is_running_; }
\r
110 void execute() // noexcept
\r
112 std::function<void()> func;
\r
113 execution_queue_.pop(func);
\r
117 void run() // noexcept
\r
119 win32_exception::install_handler();
\r
124 std::function<void()> f_;
\r
125 boost::thread thread_;
\r
126 tbb::atomic<bool> is_running_;
\r
127 tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;
\r