]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 #include <array>\r
14 \r
15 namespace caspar {\r
16 \r
17 class executor : boost::noncopyable\r
18 {\r
19 public:\r
20 \r
21         enum stop_policy\r
22         {\r
23                 wait,\r
24                 no_wait\r
25         };\r
26         \r
27         explicit executor(const std::function<void()>& f = nullptr)\r
28         {\r
29                 is_running_ = false;\r
30                 f_ = f != nullptr ? f : [this]{run();};\r
31         }\r
32 \r
33         virtual ~executor()\r
34         {\r
35                 stop();\r
36         }\r
37 \r
38         void set_capacity(size_t capacity)\r
39         {\r
40                 execution_queue_.set_capacity(capacity);\r
41         }\r
42 \r
43         void start() // noexcept\r
44         {\r
45                 if(is_running_.fetch_and_store(true))\r
46                         return;\r
47                 thread_ = boost::thread(f_);\r
48         }\r
49                         \r
50         void stop(stop_policy policy = wait) // noexcept\r
51         {\r
52                 is_running_ = false;    \r
53                 execution_queue_.push([]{});\r
54                 if(policy == wait && boost::this_thread::get_id() != thread_.get_id())\r
55                         thread_.join();\r
56         }\r
57 \r
58         void clear()\r
59         {\r
60                 std::function<void()> func;\r
61                 while(execution_queue_.try_pop(func)){}\r
62         }\r
63                         \r
64         template<typename Func>\r
65         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
66         {       \r
67                 typedef boost::packaged_task<decltype(func())> task_type;\r
68                                 \r
69                 auto task = task_type(std::forward<Func>(func));\r
70                 auto future = task.get_future();\r
71                 \r
72                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
73                 {\r
74                         try\r
75                         {\r
76                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
77                                         my_task();\r
78                         }\r
79                         catch(boost::task_already_started&){}\r
80                 }));\r
81                                 \r
82                 struct task_adaptor_t\r
83                 {\r
84                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
85                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
86                         void operator()() const { task(); }\r
87                         mutable task_type task;\r
88                 } task_adaptor(std::move(task));\r
89 \r
90                 execution_queue_.push([=]\r
91                 {\r
92                         try{task_adaptor();}\r
93                         catch(boost::task_already_started&){}\r
94                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
95                 });\r
96 \r
97                 return std::move(future);               \r
98         }\r
99         \r
100         template<typename Func>\r
101         auto invoke(Func&& func) -> decltype(func())\r
102         {\r
103                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
104                         return func();\r
105                 \r
106                 return begin_invoke(std::forward<Func>(func)).get();\r
107         }\r
108 \r
109         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
110         bool empty() const              { return execution_queue_.empty();      }\r
111         bool is_running() const { return is_running_;                           }       \r
112                 \r
113 private:\r
114         \r
115         void execute() // noexcept\r
116         {\r
117                 std::function<void()> func;\r
118                 execution_queue_.pop(func);     \r
119                 func();\r
120         }\r
121 \r
122         void run() // noexcept\r
123         {\r
124                 win32_exception::install_handler();\r
125                 while(is_running_)\r
126                         execute();\r
127         }\r
128         \r
129         std::function<void()> f_;\r
130         boost::thread thread_;\r
131         tbb::atomic<bool> is_running_;\r
132         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
133 };\r
134 \r
135 }