]> git.sesse.net Git - casparcg/blob - common/executor.h
b58339e1f29b704a28c3568eb9b76b10db514a75
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "except.h"
25 #include "enum_class.h"
26 #include "log.h"
27
28 #include <tbb/atomic.h>
29 #include <tbb/concurrent_priority_queue.h>
30 #include <tbb/concurrent_queue.h>
31
32 #include <boost/thread.hpp>
33
34 #include <functional>
35
36 namespace caspar {
37                 
38 struct task_priority_def
39 {
40         enum type
41         {
42                 lowest_priority = 0,
43                 lower_priority,
44                 low_priority,
45                 normal_priority,
46                 high_priority,
47                 higher_priority
48         };
49 };
50 typedef enum_class<task_priority_def> task_priority;
51
52 class executor sealed
53 {       
54         struct priority_function
55         {
56                 int                                             priority;
57                 std::function<void()>   func;
58
59                 priority_function()
60                 {
61                 }
62
63                 template<typename F>
64                 priority_function(int priority, F&& func)
65                         : priority(priority)
66                         , func(std::forward<F>(func))
67                 {
68                 }
69
70                 void operator()()
71                 {
72                         func();
73                 }
74
75                 bool operator<(const priority_function& other) const
76                 {
77                         return priority < other.priority;
78                 }
79         };
80
81         executor(const executor&);
82         executor& operator=(const executor&);
83         
84         typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;
85         
86         const std::wstring                                                                                      name_;
87         tbb::atomic<bool>                                                                                       is_running_;
88         boost::thread                                                                                           thread_;        
89         function_queue_t                                                                                        execution_queue_;
90         tbb::concurrent_bounded_queue<int>                                                      semaphore_;
91                 
92 public:         
93         executor(const std::wstring& name)
94                 : name_(name)
95         {
96                 is_running_ = true;
97                 set_capacity(512);
98                 thread_ = boost::thread([this]{run();});
99         }
100         
101         ~executor()
102         {
103                 try
104                 {
105                         internal_begin_invoke([=]
106                         {
107                                 is_running_ = false;
108                         }).wait();
109                 }
110                 catch(...)
111                 {
112                         CASPAR_LOG_CURRENT_EXCEPTION();
113                 }
114                 
115                 thread_.join();
116         }
117                                                 
118         template<typename Func>
119         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
120         {       
121                 if(!is_running_)
122                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
123                                 
124                 return internal_begin_invoke(std::forward<Func>(func), priority);       
125         }
126         
127         template<typename Func>
128         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
129         {
130                 if(is_current())  // Avoids potential deadlock.
131                         return func();
132                 
133                 return begin_invoke(std::forward<Func>(func), prioriy).get();
134         }
135
136         void yield()
137         {
138                 if(!is_current())
139                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
140
141                 int dummy;
142                 if(!semaphore_.try_pop(dummy))
143                         return;
144
145                 priority_function func;
146                 if(execution_queue_.try_pop(func))
147                         func();
148         }
149
150         void set_capacity(std::size_t capacity)
151         {
152                 semaphore_.set_capacity(capacity);
153         }
154
155         std::size_t capacity() const
156         {
157                 return semaphore_.capacity();
158         }
159         
160         void clear()
161         {               
162                 priority_function func;
163                 while(execution_queue_.try_pop(func));
164         }
165                                 
166         void stop()
167         {
168                 invoke([this]
169                 {
170                         is_running_ = false;
171                 });
172         }
173
174         void wait()
175         {
176                 invoke([]{}, task_priority::lowest_priority);
177         }
178                 
179         function_queue_t::size_type size() const 
180         {
181                 return execution_queue_.size(); 
182         }
183                 
184         bool is_running() const
185         {
186                 return is_running_; 
187         }       
188
189         bool is_current() const
190         {
191                 return boost::this_thread::get_id() == thread_.get_id();
192         }
193                 
194 private:        
195
196         std::wstring print() const
197         {
198                 return L"executor[" + name_ + L"]";
199         }
200         
201         template<typename Func>
202         auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
203         {                                       
204                 typedef typename std::remove_reference<Func>::type      function_type;
205                 typedef decltype(func())                                                        result_type;
206                 typedef boost::packaged_task<result_type>                       task_type;
207                                                                 
208                 std::unique_ptr<task_type> task;
209
210                 // Use pointers since the boost thread library doesn't fully support move semantics.
211
212                 auto raw_func2 = new function_type(std::forward<Func>(func));
213                 try
214                 {
215                         task.reset(new task_type([raw_func2]() -> result_type
216                         {
217                                 std::unique_ptr<function_type> func2(raw_func2);
218                                 return (*func2)();
219                         }));
220                 }
221                 catch(...)
222                 {
223                         delete raw_func2;
224                         throw;
225                 }
226                 
227                 task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
228                 {
229                         try
230                         {
231                                 if(is_current())  // Avoids potential deadlock.
232                                         my_task();
233                         }
234                         catch(boost::task_already_started&){}
235                 }));
236                                 
237                 auto future = task->get_future();
238
239                 auto raw_task = task.release();
240                 priority_function prio_func(priority.value(), [raw_task]
241                 {
242                         std::unique_ptr<task_type> task(raw_task);
243                         try
244                         {
245                                 (*task)();
246                         }
247                         catch(boost::task_already_started&){}
248                 });
249
250                 execution_queue_.push(prio_func);
251
252                 if(!semaphore_.try_push(0))
253                 {
254                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
255                         semaphore_.push(0);
256                 }                                       
257                 return std::move(future);               
258         }
259
260         void run() // noexcept
261         {
262                 win32_exception::install_handler();             
263                 while(is_running_)
264                 {
265                         try
266                         {
267                                 int dummy;
268                                 semaphore_.pop(dummy);
269
270                                 priority_function func;
271                                 if(execution_queue_.try_pop(func))
272                                         func();
273                         }
274                         catch(...)
275                         {
276                                 CASPAR_LOG_CURRENT_EXCEPTION();
277                         }
278                 }
279         }       
280 };
281
282 }