]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2:
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/exceptions.h"\r
4 #include "../exception/win32_exception.h"\r
5 \r
6 #include <boost/thread.hpp>\r
7 \r
8 #include <tbb/atomic.h>\r
9 #include <tbb/concurrent_queue.h>\r
10 \r
11 #include <functional>\r
12 \r
13 namespace caspar { namespace common {\r
14 \r
15 class executor\r
16 {\r
17 public:\r
18         explicit executor(const std::function<void()>& run_func = nullptr)\r
19         {\r
20                 is_running_ = false;\r
21                 run_func_ = run_func != nullptr ? run_func : [=]{run();};\r
22         }\r
23 \r
24         virtual ~executor()\r
25         {\r
26                 stop();\r
27         }\r
28 \r
29         void start()\r
30         {\r
31                 if(is_running_.fetch_and_store(true))\r
32                         return;\r
33                 thread_ = boost::thread(run_func_);\r
34         }\r
35 \r
36         bool is_running() const\r
37         {\r
38                 return is_running_;\r
39         }\r
40         \r
41         void stop()\r
42         {\r
43                 if(is_running_.fetch_and_store(false))\r
44                 {\r
45                         execution_queue_.clear();\r
46                         execution_queue_.push([](){});                  \r
47                 }\r
48                 thread_.join();\r
49         }\r
50 \r
51         void execute()\r
52         {\r
53                 std::function<void()> func;\r
54                 execution_queue_.pop(func);     \r
55                 func();\r
56         }\r
57 \r
58         bool try_execute()\r
59         {\r
60                 std::function<void()> func;\r
61                 if(execution_queue_.try_pop(func))\r
62                         func();\r
63 \r
64                 return func != nullptr;\r
65         }\r
66 \r
67         void clear()\r
68         {\r
69                 execution_queue_.clear();\r
70         }\r
71 \r
72         template<typename Func>\r
73         void enqueue(Func&& func)\r
74         {\r
75                 execution_queue_.push([=]{try{func();}catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}});\r
76         }\r
77         \r
78         template<typename Func>\r
79         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())>\r
80         {       \r
81                 typedef decltype(func()) result_type; \r
82                                 \r
83                 auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved, need to used shared_ptr.\r
84                 auto future = task->get_future();\r
85                 \r
86                 task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
87                 {\r
88                         try\r
89                         {\r
90                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
91                                         task();\r
92                         }\r
93                         catch(boost::task_already_started&){}\r
94                 }));\r
95                 execution_queue_.push([=]\r
96                 {\r
97                         try\r
98                         {\r
99                                 (*task)();    \r
100                         }\r
101                         catch(boost::task_already_started&){}\r
102                 });\r
103 \r
104                 return std::move(future);               \r
105         }\r
106         \r
107         template<typename Func>\r
108         auto invoke(Func&& func) -> decltype(func())\r
109         {\r
110                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
111                         return func();\r
112                 \r
113                 return begin_invoke(std::forward<Func>(func)).get();\r
114         }\r
115                 \r
116 private:\r
117 \r
118         virtual void run()\r
119         {\r
120                 win32_exception::install_handler();\r
121                 while(is_running_)\r
122                         execute();\r
123         }\r
124 \r
125         std::function<void()> run_func_;\r
126         boost::thread thread_;\r
127         tbb::atomic<bool> is_running_;\r
128         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
129 };\r
130 \r
131 }}