]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2:
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/exceptions.h"\r
4 #include "../exception/win32_exception.h"\r
5 \r
6 #include <boost/thread.hpp>\r
7 \r
8 #include <tbb/atomic.h>\r
9 #include <tbb/concurrent_queue.h>\r
10 \r
11 #include <functional>\r
12 \r
13 namespace caspar { namespace common {\r
14 \r
15 class executor\r
16 {\r
17 public:\r
18         explicit executor(const std::function<void()>& run_func = nullptr)\r
19         {\r
20                 is_running_ = false;\r
21                 run_func_ = run_func != nullptr ? run_func : [=]{run();};\r
22         }\r
23 \r
24         virtual ~executor()\r
25         {\r
26                 stop();\r
27         }\r
28 \r
29         void start()\r
30         {\r
31                 if(is_running_.fetch_and_store(true))\r
32                         return;\r
33                 thread_ = boost::thread(run_func_);\r
34         }\r
35 \r
36         bool is_running() const\r
37         {\r
38                 return is_running_;\r
39         }\r
40 \r
41         void stop()\r
42         {\r
43                 if(is_running_.fetch_and_store(false))\r
44                 {\r
45                         execution_queue_.clear();\r
46                         execution_queue_.push([](){});                  \r
47                 }\r
48                 thread_.join();\r
49         }\r
50 \r
51         void execute()\r
52         {\r
53                 std::function<void()> func;\r
54                 execution_queue_.pop(func);     \r
55                 func();\r
56         }\r
57 \r
58         bool try_execute()\r
59         {\r
60                 std::function<void()> func;\r
61                 if(execution_queue_.try_pop(func))\r
62                         func();\r
63 \r
64                 return func != nullptr;\r
65         }\r
66 \r
67         void clear()\r
68         {\r
69                 execution_queue_.clear();\r
70         }\r
71 \r
72         template<typename Func>\r
73         void enqueue(Func&& func)\r
74         {\r
75                 execution_queue_.push([=]{try{func();}catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}});\r
76         }\r
77 \r
78         template<typename Func>\r
79         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())>\r
80         {       \r
81                 typedef decltype(func()) result_type; \r
82                                 \r
83                 auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func));      \r
84                 auto future = task->get_future();\r
85                 \r
86                 if(boost::this_thread::get_id() != thread_.get_id())\r
87                         execution_queue_.push([=]{(*task)();});\r
88                 else\r
89                         (*task)();\r
90 \r
91                 return std::move(future);               \r
92         }\r
93         \r
94         template<typename Func>\r
95         auto invoke(Func&& func) -> decltype(func())\r
96         {\r
97                 return begin_invoke(std::forward<Func>(func)).get();\r
98         }\r
99         \r
100 private:\r
101 \r
102         virtual void run()\r
103         {\r
104                 win32_exception::install_handler();\r
105                 while(is_running_)\r
106                         execute();\r
107         }\r
108 \r
109         std::function<void()> run_func_;\r
110         boost::thread thread_;\r
111         tbb::atomic<bool> is_running_;\r
112         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
113 };\r
114 \r
115 }}