]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2:
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 #include <array>\r
14 \r
15 namespace caspar {\r
16 \r
17 class executor : boost::noncopyable\r
18 {\r
19 public:\r
20 \r
21         enum stop_policy\r
22         {\r
23                 wait,\r
24                 no_wait\r
25         };\r
26         \r
27         explicit executor(const std::function<void()>& f = nullptr)\r
28         {\r
29                 is_running_ = false;\r
30                 f_ = f != nullptr ? f : [this]{run();};\r
31         }\r
32 \r
33         ~executor()\r
34         {\r
35                 stop();\r
36         }\r
37 \r
38         void start() // noexcept\r
39         {\r
40                 if(is_running_.fetch_and_store(true))\r
41                         return;\r
42                 thread_ = boost::thread(f_);\r
43         }\r
44                         \r
45         void stop(stop_policy policy = wait) // noexcept\r
46         {\r
47                 is_running_ = false;    \r
48                 execution_queue_.push([]{});\r
49                 if(policy == wait && boost::this_thread::get_id() != thread_.get_id())\r
50                         thread_.join();\r
51         }\r
52 \r
53         void clear()\r
54         {\r
55                 std::function<void()> func;\r
56                 while(execution_queue_.try_pop(func)){}\r
57         }\r
58                         \r
59         template<typename Func>\r
60         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
61         {       \r
62                 typedef boost::packaged_task<decltype(func())> task_type;\r
63                                 \r
64                 auto task = task_type(std::forward<Func>(func));\r
65                 auto future = task.get_future();\r
66                 \r
67                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
68                 {\r
69                         try\r
70                         {\r
71                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
72                                         my_task();\r
73                         }\r
74                         catch(boost::task_already_started&){}\r
75                 }));\r
76                                 \r
77                 struct task_adaptor_t\r
78                 {\r
79                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
80                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
81                         void operator()() const { task(); }\r
82                         mutable task_type task;\r
83                 } task_adaptor(std::move(task));\r
84 \r
85                 execution_queue_.try_push([=]\r
86                 {\r
87                         try{task_adaptor();}\r
88                         catch(boost::task_already_started&){}\r
89                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
90                 });\r
91 \r
92                 return std::move(future);               \r
93         }\r
94         \r
95         template<typename Func>\r
96         auto invoke(Func&& func) -> decltype(func())\r
97         {\r
98                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
99                         return func();\r
100                 \r
101                 return begin_invoke(std::forward<Func>(func)).get();\r
102         }\r
103 \r
104         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
105         bool empty() const              { return execution_queue_.empty();      }\r
106         bool is_running() const { return is_running_;                           }       \r
107                 \r
108 private:\r
109         \r
110         void execute() // noexcept\r
111         {\r
112                 std::function<void()> func;\r
113                 execution_queue_.pop(func);     \r
114                 func();\r
115         }\r
116 \r
117         void run() // noexcept\r
118         {\r
119                 win32_exception::install_handler();\r
120                 while(is_running_)\r
121                         execute();\r
122         }\r
123         \r
124         std::function<void()> f_;\r
125         boost::thread thread_;\r
126         tbb::atomic<bool> is_running_;\r
127         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
128 };\r
129 \r
130 }