]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2: Fixed compability with CasparCG Client.
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 #include <array>\r
14 \r
15 namespace caspar {\r
16 \r
17 class executor : boost::noncopyable\r
18 {\r
19 public:\r
20                 \r
21         explicit executor(const std::function<void()>& f = nullptr)\r
22         {\r
23                 is_running_ = false;\r
24                 f_ = f != nullptr ? f : [this]{run();};\r
25         }\r
26 \r
27         virtual ~executor()\r
28         {\r
29                 stop();\r
30                 if(boost::this_thread::get_id() != thread_.get_id())\r
31                         thread_.join();\r
32         }\r
33 \r
34         void set_capacity(size_t capacity)\r
35         {\r
36                 execution_queue_.set_capacity(capacity);\r
37         }\r
38 \r
39         void start() // noexcept\r
40         {\r
41                 if(is_running_.fetch_and_store(true))\r
42                         return;\r
43                 clear();\r
44                 thread_ = boost::thread(f_);\r
45         }\r
46                         \r
47         void stop() // noexcept\r
48         {\r
49                 is_running_ = false;    \r
50                 execution_queue_.push([]{});\r
51         }\r
52 \r
53         void wait()\r
54         {\r
55                 invoke([]{});\r
56         }\r
57 \r
58         void clear()\r
59         {\r
60                 std::function<void()> func;\r
61                 while(true)\r
62                 {\r
63                         try\r
64                         {\r
65                                 if(!execution_queue_.try_pop(func))\r
66                                         return;\r
67                         }\r
68                         catch(boost::broken_promise&){}\r
69                 }\r
70         }\r
71                         \r
72         template<typename Func>\r
73         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
74         {       \r
75                 typedef boost::packaged_task<decltype(func())> task_type;\r
76                                 \r
77                 auto task = task_type(std::forward<Func>(func));\r
78                 auto future = task.get_future();\r
79                 \r
80                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
81                 {\r
82                         try\r
83                         {\r
84                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
85                                         my_task();\r
86                         }\r
87                         catch(boost::task_already_started&){}\r
88                 }));\r
89                                 \r
90                 struct task_adaptor_t\r
91                 {\r
92                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
93                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
94                         void operator()() const { task(); }\r
95                         mutable task_type task;\r
96                 } task_adaptor(std::move(task));\r
97 \r
98                 execution_queue_.push([=]\r
99                 {\r
100                         try{task_adaptor();}\r
101                         catch(boost::task_already_started&){}\r
102                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
103                 });\r
104 \r
105                 return std::move(future);               \r
106         }\r
107         \r
108         template<typename Func>\r
109         auto invoke(Func&& func) -> decltype(func())\r
110         {\r
111                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
112                         return func();\r
113                 \r
114                 return begin_invoke(std::forward<Func>(func)).get();\r
115         }\r
116         \r
117         tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity();  }\r
118         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
119         bool empty() const              { return execution_queue_.empty();      }\r
120         bool is_running() const { return is_running_;                           }       \r
121                 \r
122 private:\r
123         \r
124         void execute() // noexcept\r
125         {\r
126                 std::function<void()> func;\r
127                 execution_queue_.pop(func);     \r
128                 func();\r
129         }\r
130 \r
131         void run() // noexcept\r
132         {\r
133                 win32_exception::install_handler();\r
134                 while(is_running_)\r
135                         execute();\r
136         }\r
137         \r
138         std::function<void()> f_;\r
139         boost::thread thread_;\r
140         tbb::atomic<bool> is_running_;\r
141         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
142 };\r
143 \r
144 }