]> git.sesse.net Git - casparcg/blob - common/executor.h
5c296dec0dac97a12b94f4d4036c1b8ad968722c
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "except.h"
25 #include "enum_class.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28
29 #include <tbb/atomic.h>
30 #include <tbb/concurrent_priority_queue.h>
31
32 #include <boost/thread.hpp>
33 #include <boost/optional.hpp>
34
35 #include <functional>
36
37 namespace caspar {
38                 
39 struct task_priority_def
40 {
41         enum type
42         {
43                 lowest_priority = 0,
44                 lower_priority,
45                 low_priority,
46                 normal_priority,
47                 high_priority,
48                 higher_priority
49         };
50 };
51 typedef enum_class<task_priority_def> task_priority;
52
53 class executor sealed
54 {       
55         struct priority_function
56         {
57                 int                                             priority;
58                 std::function<void()>   func;
59
60                 priority_function()
61                 {
62                 }
63
64                 template<typename F>
65                 priority_function(int priority, F&& func)
66                         : priority(priority)
67                         , func(std::forward<F>(func))
68                 {
69                 }
70
71                 void operator()()
72                 {
73                         func();
74                 }
75
76                 bool operator<(const priority_function& other) const
77                 {
78                         return priority < other.priority;
79                 }
80         };
81
82         executor(const executor&);
83         executor& operator=(const executor&);
84         
85         typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>>       function_queue_t;
86         
87         const std::wstring                                                                                      name_;
88         tbb::atomic<bool>                                                                                       is_running_;
89         boost::thread                                                                                           thread_;        
90         function_queue_t                                                                                        execution_queue_;
91                 
92 public:         
93         executor(const std::wstring& name)
94                 : name_(name)
95                 , execution_queue_(512)
96         {
97                 is_running_ = true;
98                 thread_ = boost::thread([this]{run();});
99         }
100         
101         ~executor()
102         {
103                 try
104                 {
105                         internal_begin_invoke([=]
106                         {
107                                 is_running_ = false;
108                         }, false).wait();
109                 }
110                 catch(...)
111                 {
112                         CASPAR_LOG_CURRENT_EXCEPTION();
113                 }
114                 
115                 thread_.join();
116         }
117         
118         template<typename Func>
119         auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())>
120         {       
121                 if(!is_running_)
122                         BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
123
124                 // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
125                 return internal_begin_invoke(std::forward<Func>(func), true, priority);
126         }
127
128         template<typename Func>
129         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
130         {       
131                 if(!is_running_)
132                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
133                                 
134                 return internal_begin_invoke(std::forward<Func>(func), false, priority);        
135         }
136         
137         template<typename Func>
138         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
139         {
140                 if(is_current())  // Avoids potential deadlock.
141                         return func();
142                 
143                 return begin_invoke(std::forward<Func>(func), prioriy).get();
144         }
145
146         void yield()
147         {
148                 if(!is_current())
149                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
150
151                 priority_function func;
152                 if(execution_queue_.try_pop(func))
153                         func();
154         }
155
156         void set_capacity(function_queue_t::size_type capacity)
157         {
158                 execution_queue_.set_capacity(capacity);
159         }
160
161         function_queue_t::size_type capacity() const
162         {
163                 return execution_queue_.capacity();
164         }
165         
166         void clear()
167         {               
168                 priority_function func;
169                 while(execution_queue_.try_pop(func));
170         }
171                                 
172         void stop()
173         {
174                 invoke([this]
175                 {
176                         is_running_ = false;
177                 });
178         }
179
180         void wait()
181         {
182                 invoke([]{}, task_priority::lowest_priority);
183         }
184                 
185         function_queue_t::size_type size() const 
186         {
187                 return execution_queue_.size(); 
188         }
189                 
190         bool is_running() const
191         {
192                 return is_running_; 
193         }       
194
195         bool is_current() const
196         {
197                 return boost::this_thread::get_id() == thread_.get_id();
198         }
199                 
200 private:        
201
202         std::wstring print() const
203         {
204                 return L"executor[" + name_ + L"]";
205         }
206         
207         template<typename Func>
208         auto internal_begin_invoke(
209                 Func&& func,
210                 bool try_begin,
211                 task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
212         {                                       
213                 typedef typename std::remove_reference<Func>::type      function_type;
214                 typedef decltype(func())                                                        result_type;
215                 typedef boost::packaged_task<result_type>                       task_type;
216                                                                 
217                 std::unique_ptr<task_type> task;
218
219                 // Use pointers since the boost thread library doesn't fully support move semantics.
220
221                 auto raw_func2 = new function_type(std::forward<Func>(func));
222                 try
223                 {
224                         task.reset(new task_type([raw_func2]() -> result_type
225                         {
226                                 std::unique_ptr<function_type> func2(raw_func2);
227                                 return (*func2)();
228                         }));
229                 }
230                 catch(...)
231                 {
232                         delete raw_func2;
233                         throw;
234                 }
235                 
236                 task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
237                 {
238                         try
239                         {
240                                 if(is_current())  // Avoids potential deadlock.
241                                         my_task();
242                         }
243                         catch(boost::task_already_started&){}
244                 }));
245                                 
246                 auto future = task->get_future();
247
248                 auto raw_task = task.release();
249                 priority_function prio_func(priority.value(), [raw_task]
250                 {
251                         std::unique_ptr<task_type> task(raw_task);
252                         try
253                         {
254                                 (*task)();
255                         }
256                         catch(boost::task_already_started&){}
257                 });
258
259                 if (!execution_queue_.try_push(prio_func))
260                 {
261                         if (try_begin)
262                         {
263                                 delete raw_task;
264
265                                 return boost::unique_future<decltype(func())>();
266                         }
267                         else
268                         {
269                                 CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
270                                 execution_queue_.push(prio_func);
271                         }
272                 }
273
274                 return std::move(future);               
275         }
276
277         void run() // noexcept
278         {
279                 win32_exception::install_handler();             
280                 while(is_running_)
281                 {
282                         try
283                         {
284                                 priority_function func;
285                                 execution_queue_.pop(func);
286                                 func();
287                         }
288                         catch(...)
289                         {
290                                 CASPAR_LOG_CURRENT_EXCEPTION();
291                         }
292                 }
293         }       
294 };
295
296 }