]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 #include <array>\r
14 \r
15 namespace caspar {\r
16 \r
17 class executor : boost::noncopyable\r
18 {\r
19 public:\r
20         \r
21         explicit executor(const std::function<void()>& f = nullptr)\r
22         {\r
23                 is_running_ = false;\r
24                 f_ = f != nullptr ? f : [this]{run();};\r
25         }\r
26 \r
27         ~executor()\r
28         {\r
29                 stop();\r
30         }\r
31 \r
32         void start() // noexcept\r
33         {\r
34                 if(is_running_.fetch_and_store(true))\r
35                         return;\r
36                 thread_ = boost::thread(f_);\r
37         }\r
38                 \r
39         void stop(bool wait = true) // noexcept\r
40         {\r
41                 is_running_ = false;    \r
42                 execution_queue_.push([]{});\r
43                 if(wait && boost::this_thread::get_id() != thread_.get_id())\r
44                         thread_.join();\r
45         }\r
46                         \r
47         template<typename Func>\r
48         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
49         {       \r
50                 typedef boost::packaged_task<decltype(func())> task_type;\r
51                                 \r
52                 auto task = task_type(std::forward<Func>(func));\r
53                 auto future = task.get_future();\r
54                 \r
55                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
56                 {\r
57                         try\r
58                         {\r
59                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
60                                         my_task();\r
61                         }\r
62                         catch(boost::task_already_started&){}\r
63                 }));\r
64                                 \r
65                 struct task_adaptor_t\r
66                 {\r
67                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
68                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
69                         void operator()() const { task(); }\r
70                         mutable task_type task;\r
71                 } task_adaptor(std::move(task));\r
72 \r
73                 execution_queue_.try_push([=]\r
74                 {\r
75                         try{task_adaptor();}\r
76                         catch(boost::task_already_started&){}\r
77                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
78                 });\r
79 \r
80                 return std::move(future);               \r
81         }\r
82         \r
83         template<typename Func>\r
84         auto invoke(Func&& func) -> decltype(func())\r
85         {\r
86                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
87                         return func();\r
88                 \r
89                 return begin_invoke(std::forward<Func>(func)).get();\r
90         }\r
91 \r
92         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
93         bool empty() const              { return execution_queue_.empty();      }\r
94         bool is_running() const { return is_running_;                           }       \r
95                 \r
96 private:\r
97         \r
98         void execute() // noexcept\r
99         {\r
100                 std::function<void()> func;\r
101                 execution_queue_.pop(func);     \r
102                 func();\r
103         }\r
104 \r
105         void run() // noexcept\r
106         {\r
107                 win32_exception::install_handler();\r
108                 while(is_running_)\r
109                         execute();\r
110         }\r
111         \r
112         std::function<void()> f_;\r
113         boost::thread thread_;\r
114         tbb::atomic<bool> is_running_;\r
115         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
116 };\r
117 \r
118 }