]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2: Fixed proper destruction order of executor(threads) to avoid access violatio...
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 #include <array>\r
14 \r
15 namespace caspar {\r
16 \r
17 class executor : boost::noncopyable\r
18 {\r
19 public:\r
20                 \r
21         explicit executor(const std::function<void()>& f = nullptr)\r
22         {\r
23                 is_running_ = false;\r
24                 f_ = f != nullptr ? f : [this]{run();};\r
25         }\r
26 \r
27         virtual ~executor()\r
28         {\r
29                 wait();\r
30                 stop();\r
31                 if(boost::this_thread::get_id() != thread_.get_id())\r
32                         thread_.join();\r
33         }\r
34 \r
35         void set_capacity(size_t capacity)\r
36         {\r
37                 execution_queue_.set_capacity(capacity);\r
38         }\r
39 \r
40         void start() // noexcept\r
41         {\r
42                 if(is_running_.fetch_and_store(true))\r
43                         return;\r
44                 thread_ = boost::thread(f_);\r
45                 try\r
46                 {\r
47                         execution_queue_.clear();\r
48                 }\r
49                 catch(boost::broken_promise&){}\r
50         }\r
51                         \r
52         void stop() // noexcept\r
53         {\r
54                 is_running_ = false;    \r
55                 execution_queue_.push([]{});\r
56         }\r
57 \r
58         void wait()\r
59         {\r
60                 invoke([]{});\r
61         }\r
62 \r
63         void clear()\r
64         {\r
65                 std::function<void()> func;\r
66                 while(execution_queue_.try_pop(func)){}\r
67         }\r
68                         \r
69         template<typename Func>\r
70         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
71         {       \r
72                 typedef boost::packaged_task<decltype(func())> task_type;\r
73                                 \r
74                 auto task = task_type(std::forward<Func>(func));\r
75                 auto future = task.get_future();\r
76                 \r
77                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
78                 {\r
79                         try\r
80                         {\r
81                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
82                                         my_task();\r
83                         }\r
84                         catch(boost::task_already_started&){}\r
85                 }));\r
86                                 \r
87                 struct task_adaptor_t\r
88                 {\r
89                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
90                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
91                         void operator()() const { task(); }\r
92                         mutable task_type task;\r
93                 } task_adaptor(std::move(task));\r
94 \r
95                 execution_queue_.push([=]\r
96                 {\r
97                         try{task_adaptor();}\r
98                         catch(boost::task_already_started&){}\r
99                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
100                 });\r
101 \r
102                 return std::move(future);               \r
103         }\r
104         \r
105         template<typename Func>\r
106         auto invoke(Func&& func) -> decltype(func())\r
107         {\r
108                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
109                         return func();\r
110                 \r
111                 return begin_invoke(std::forward<Func>(func)).get();\r
112         }\r
113 \r
114         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
115         bool empty() const              { return execution_queue_.empty();      }\r
116         bool is_running() const { return is_running_;                           }       \r
117                 \r
118 private:\r
119         \r
120         void execute() // noexcept\r
121         {\r
122                 std::function<void()> func;\r
123                 execution_queue_.pop(func);     \r
124                 func();\r
125         }\r
126 \r
127         void run() // noexcept\r
128         {\r
129                 win32_exception::install_handler();\r
130                 while(is_running_)\r
131                         execute();\r
132         }\r
133         \r
134         std::function<void()> f_;\r
135         boost::thread thread_;\r
136         tbb::atomic<bool> is_running_;\r
137         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
138 };\r
139 \r
140 }