]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/exceptions.h"\r
4 #include "../exception/win32_exception.h"\r
5 \r
6 #include <boost/thread.hpp>\r
7 \r
8 #include <tbb/atomic.h>\r
9 #include <tbb/concurrent_queue.h>\r
10 \r
11 #include <functional>\r
12 \r
13 namespace caspar {\r
14 \r
15 class executor\r
16 {\r
17 public:\r
18 \r
19         enum priority\r
20         {\r
21                 low_priority = 0,\r
22                 normal_priority,\r
23                 high_priority\r
24         };\r
25 \r
26         explicit executor(const std::function<void()>& f = nullptr)\r
27         {\r
28                 size_ = 0;\r
29                 is_running_ = false;\r
30                 f_ = f != nullptr ? f : [this]{run();};\r
31         }\r
32 \r
33         virtual ~executor()\r
34         {\r
35                 stop();\r
36         }\r
37 \r
38         void start() // noexcept\r
39         {\r
40                 if(is_running_.fetch_and_store(true))\r
41                         return;\r
42                 thread_ = boost::thread(f_);\r
43         }\r
44 \r
45         bool is_running() const // noexcept\r
46         {\r
47                 return is_running_;\r
48         }\r
49         \r
50         void stop() // noexcept\r
51         {\r
52                 is_running_ = false;            \r
53                 thread_.join();\r
54         }\r
55 \r
56         void execute() // noexcept\r
57         {\r
58                 boost::unique_lock<boost::mutex> lock(mut_);\r
59                 while(size_ < 1)                \r
60                         cond_.wait(lock);\r
61                 \r
62                 try_execute();\r
63         }\r
64 \r
65         bool try_execute() // noexcept\r
66         {\r
67                 std::function<void()> func;\r
68                 if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func))\r
69                 {\r
70                         func();\r
71                         --size_;\r
72                 }\r
73 \r
74                 return func != nullptr;\r
75         }\r
76                         \r
77         template<typename Func>\r
78         auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
79         {       \r
80                 typedef decltype(func()) result_type; \r
81                                 \r
82                 auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr.\r
83                 auto future = task->get_future();\r
84                 \r
85                 task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
86                 {\r
87                         try\r
88                         {\r
89                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
90                                         task();\r
91                         }\r
92                         catch(boost::task_already_started&){}\r
93                 }));\r
94                 execution_queue_[p].push([=]\r
95                 {\r
96                         try{(*task)();}\r
97                         catch(boost::task_already_started&){}\r
98                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
99                 });\r
100                 ++size_;\r
101                 cond_.notify_one();\r
102 \r
103                 return std::move(future);               \r
104         }\r
105         \r
106         template<typename Func>\r
107         auto invoke(Func&& func, priority p = normal_priority) -> decltype(func())\r
108         {\r
109                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
110                         return func();\r
111                 \r
112                 return begin_invoke(std::forward<Func>(func), p).get();\r
113         }\r
114                 \r
115 private:\r
116 \r
117         virtual void run() // noexcept\r
118         {\r
119                 win32_exception::install_handler();\r
120                 while(is_running_)\r
121                         execute();\r
122         }\r
123 \r
124         tbb::atomic<size_t> size_;\r
125         boost::condition_variable cond_;\r
126         boost::mutex mut_;\r
127 \r
128         std::function<void()> f_;\r
129         boost::thread thread_;\r
130         tbb::atomic<bool> is_running_;\r
131         std::array<tbb::concurrent_bounded_queue<std::function<void()>>, 3> execution_queue_;\r
132 };\r
133 \r
134 }