]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2: Mayor flash producer readability and maintainability improvements.
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/exceptions.h"\r
4 #include "../exception/win32_exception.h"\r
5 \r
6 #include <boost/thread.hpp>\r
7 \r
8 #include <tbb/atomic.h>\r
9 #include <tbb/concurrent_queue.h>\r
10 \r
11 #include <functional>\r
12 \r
13 namespace caspar {\r
14 \r
15 class executor\r
16 {\r
17 public:\r
18 \r
19         enum priority\r
20         {\r
21                 low_priority = 0,\r
22                 normal_priority,\r
23                 high_priority\r
24         };\r
25 \r
26         explicit executor(const std::function<void()>& f = nullptr)\r
27         {\r
28                 size_ = 0;\r
29                 is_running_ = false;\r
30                 f_ = f != nullptr ? f : [this]{run();};\r
31         }\r
32 \r
33         virtual ~executor()\r
34         {\r
35                 stop();\r
36         }\r
37 \r
38         void start() // noexcept\r
39         {\r
40                 if(is_running_.fetch_and_store(true))\r
41                         return;\r
42                 thread_ = boost::thread(f_);\r
43         }\r
44 \r
45         bool is_running() const // noexcept\r
46         {\r
47                 return is_running_;\r
48         }\r
49         \r
50         void stop() // noexcept\r
51         {\r
52                 is_running_ = false;    \r
53                 begin_invoke([]{}); // wake if sleeping\r
54                 assert(boost::this_thread::get_id() != thread_.get_id());\r
55                 thread_.join();\r
56         }\r
57 \r
58         void execute() // noexcept\r
59         {\r
60                 boost::unique_lock<boost::mutex> lock(mut_);\r
61                 while(size_ < 1)                \r
62                         cond_.wait(lock);\r
63                 \r
64                 try_execute();\r
65         }\r
66 \r
67         bool try_execute() // noexcept\r
68         {\r
69                 std::function<void()> func;\r
70                 if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func))\r
71                 {\r
72                         func();\r
73                         --size_;\r
74                 }\r
75 \r
76                 return func != nullptr;\r
77         }\r
78                         \r
79         template<typename Func>\r
80         auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
81         {       \r
82                 typedef decltype(func()) result_type; \r
83                                 \r
84                 auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr.\r
85                 auto future = task->get_future();\r
86                 \r
87                 task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
88                 {\r
89                         try\r
90                         {\r
91                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
92                                         task();\r
93                         }\r
94                         catch(boost::task_already_started&){}\r
95                 }));\r
96                 execution_queue_[p].push([=]\r
97                 {\r
98                         try{(*task)();}\r
99                         catch(boost::task_already_started&){}\r
100                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
101                 });\r
102                 ++size_;\r
103                 cond_.notify_one();\r
104 \r
105                 return std::move(future);               \r
106         }\r
107         \r
108         template<typename Func>\r
109         auto invoke(Func&& func, priority p = normal_priority) -> decltype(func())\r
110         {\r
111                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
112                         return func();\r
113                 \r
114                 return begin_invoke(std::forward<Func>(func), p).get();\r
115         }\r
116                 \r
117 private:\r
118 \r
119         virtual void run() // noexcept\r
120         {\r
121                 win32_exception::install_handler();\r
122                 while(is_running_)\r
123                         execute();\r
124         }\r
125 \r
126         tbb::atomic<size_t> size_;\r
127         boost::condition_variable cond_;\r
128         boost::mutex mut_;\r
129 \r
130         std::function<void()> f_;\r
131         boost::thread thread_;\r
132         tbb::atomic<bool> is_running_;\r
133         std::array<tbb::concurrent_bounded_queue<std::function<void()>>, 3> execution_queue_;\r
134 };\r
135 \r
136 }