]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/exceptions.h"\r
4 #include "../exception/win32_exception.h"\r
5 #include "../log/log.h"\r
6 \r
7 #include <boost/thread.hpp>\r
8 \r
9 #include <tbb/atomic.h>\r
10 #include <tbb/concurrent_queue.h>\r
11 \r
12 #include <functional>\r
13 #include <array>\r
14 \r
15 namespace caspar {\r
16 \r
17 class executor\r
18 {\r
19 public:\r
20 \r
21         enum priority\r
22         {\r
23                 low_priority = 0,\r
24                 normal_priority,\r
25                 high_priority\r
26         };\r
27 \r
28         explicit executor(const std::function<void()>& f = nullptr)\r
29         {\r
30                 size_ = 0;\r
31                 is_running_ = false;\r
32                 f_ = f != nullptr ? f : [this]{run();};\r
33         }\r
34 \r
35         virtual ~executor()\r
36         {\r
37                 stop();\r
38         }\r
39 \r
40         void start() // noexcept\r
41         {\r
42                 if(is_running_.fetch_and_store(true))\r
43                         return;\r
44                 thread_ = boost::thread(f_);\r
45         }\r
46 \r
47         bool is_running() const // noexcept\r
48         {\r
49                 return is_running_;\r
50         }\r
51         \r
52         void stop(bool wait = true) // noexcept\r
53         {\r
54                 is_running_ = false;    \r
55                 begin_invoke([]{}); // wake if sleeping\r
56                 if(wait && boost::this_thread::get_id() != thread_.get_id())\r
57                         thread_.join();\r
58         }\r
59 \r
60         void execute() // noexcept\r
61         {\r
62                 boost::unique_lock<boost::mutex> lock(mut_);\r
63                 while(size_ < 1)                \r
64                         cond_.wait(lock);\r
65                 \r
66                 try_execute();\r
67         }\r
68 \r
69         bool try_execute() // noexcept\r
70         {\r
71                 std::function<void()> func;\r
72                 if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func))\r
73                 {\r
74                         func();\r
75                         --size_;\r
76                 }\r
77 \r
78                 return func != nullptr;\r
79         }\r
80                         \r
81         template<typename Func>\r
82         auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
83         {       \r
84                 typedef decltype(func()) result_type; \r
85                                 \r
86                 auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr.\r
87                 auto future = task->get_future();\r
88                 \r
89                 task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
90                 {\r
91                         try\r
92                         {\r
93                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
94                                         task();\r
95                         }\r
96                         catch(boost::task_already_started&){}\r
97                 }));\r
98                 execution_queue_[p].push([=]\r
99                 {\r
100                         try{(*task)();}\r
101                         catch(boost::task_already_started&){}\r
102                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
103                 });\r
104                 ++size_;\r
105                 cond_.notify_one();\r
106 \r
107                 return std::move(future);               \r
108         }\r
109 \r
110         size_t size() const\r
111         {\r
112                 return execution_queue_.size();\r
113         }\r
114 \r
115         bool empty() const\r
116         {\r
117                 return execution_queue_.empty();\r
118         }\r
119         \r
120         template<typename Func>\r
121         auto invoke(Func&& func, priority p = normal_priority) -> decltype(func())\r
122         {\r
123                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
124                         return func();\r
125                 \r
126                 return begin_invoke(std::forward<Func>(func), p).get();\r
127         }\r
128                 \r
129 private:\r
130 \r
131         virtual void run() // noexcept\r
132         {\r
133                 win32_exception::install_handler();\r
134                 while(is_running_)\r
135                         execute();\r
136         }\r
137 \r
138         tbb::atomic<size_t> size_;\r
139         boost::condition_variable cond_;\r
140         boost::mutex mut_;\r
141 \r
142         std::function<void()> f_;\r
143         boost::thread thread_;\r
144         tbb::atomic<bool> is_running_;\r
145         std::array<tbb::concurrent_bounded_queue<std::function<void()>>, 3> execution_queue_;\r
146 };\r
147 \r
148 }