3 #include "../exception/exceptions.h"
\r
4 #include "../exception/win32_exception.h"
\r
5 #include "../log/log.h"
\r
7 #include <boost/thread.hpp>
\r
9 #include <tbb/atomic.h>
\r
10 #include <tbb/concurrent_queue.h>
\r
12 #include <functional>
\r
28 explicit executor(const std::function<void()>& f = nullptr)
\r
31 is_running_ = false;
\r
32 f_ = f != nullptr ? f : [this]{run();};
\r
40 void start() // noexcept
\r
42 if(is_running_.fetch_and_store(true))
\r
44 thread_ = boost::thread(f_);
\r
47 bool is_running() const // noexcept
\r
52 void stop(bool wait = true) // noexcept
\r
54 is_running_ = false;
\r
55 begin_invoke([]{}); // wake if sleeping
\r
56 if(wait && boost::this_thread::get_id() != thread_.get_id())
\r
60 void execute() // noexcept
\r
62 boost::unique_lock<boost::mutex> lock(mut_);
\r
69 bool try_execute() // noexcept
\r
71 std::function<void()> func;
\r
72 if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func))
\r
78 return func != nullptr;
\r
81 template<typename Func>
\r
82 auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
\r
84 typedef decltype(func()) result_type;
\r
86 auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr.
\r
87 auto future = task->get_future();
\r
89 task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.
\r
93 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
96 catch(boost::task_already_started&){}
\r
98 execution_queue_[p].push([=]
\r
101 catch(boost::task_already_started&){}
\r
102 catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}
\r
105 cond_.notify_one();
\r
107 return std::move(future);
\r
110 size_t size() const
\r
112 return execution_queue_.size();
\r
117 return execution_queue_.empty();
\r
120 template<typename Func>
\r
121 auto invoke(Func&& func, priority p = normal_priority) -> decltype(func())
\r
123 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
126 return begin_invoke(std::forward<Func>(func), p).get();
\r
131 virtual void run() // noexcept
\r
133 win32_exception::install_handler();
\r
138 tbb::atomic<size_t> size_;
\r
139 boost::condition_variable cond_;
\r
142 std::function<void()> f_;
\r
143 boost::thread thread_;
\r
144 tbb::atomic<bool> is_running_;
\r
145 std::array<tbb::concurrent_bounded_queue<std::function<void()>>, 3> execution_queue_;
\r