]> git.sesse.net Git - casparcg/blob - common/executor.h
Manually merged f669fb3d7a3be5d8e14777531489285dd0d836c4 from master
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "except.h"
25 #include "enum_class.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36 #include <boost/assign/list_of.hpp>
37
38 #include <functional>
39
40 namespace caspar {
41                 
42 struct task_priority_def
43 {
44         enum type
45         {
46                 lowest_priority = 0,
47                 lower_priority,
48                 low_priority,
49                 normal_priority,
50                 high_priority,
51                 higher_priority
52         };
53 };
54 typedef enum_class<task_priority_def> task_priority;
55
56 class executor sealed
57 {       
58         executor(const executor&);
59         executor& operator=(const executor&);
60         
61         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
62         
63         const std::wstring                                                                                      name_;
64         tbb::atomic<bool>                                                                                       is_running_;
65         boost::thread                                                                                           thread_;        
66         function_queue_t                                                                                        execution_queue_;
67                 
68 public:         
69         executor(const std::wstring& name)
70                 : name_(name)
71                 , execution_queue_(512, boost::assign::list_of
72                                 (task_priority::lowest_priority)
73                                 (task_priority::lower_priority)
74                                 (task_priority::low_priority)
75                                 (task_priority::normal_priority)
76                                 (task_priority::high_priority)
77                                 (task_priority::higher_priority))
78         {
79                 is_running_ = true;
80                 thread_ = boost::thread([this]{run();});
81         }
82         
83         ~executor()
84         {
85                 try
86                 {
87                         internal_begin_invoke([=]
88                         {
89                                 is_running_ = false;
90                         }).wait();
91                 }
92                 catch(...)
93                 {
94                         CASPAR_LOG_CURRENT_EXCEPTION();
95                 }
96                 
97                 thread_.join();
98         }
99
100         template<typename Func>
101         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
102         {       
103                 if(!is_running_)
104                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
105                                 
106                 return internal_begin_invoke(std::forward<Func>(func), priority);       
107         }
108         
109         template<typename Func>
110         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
111         {
112                 if(is_current())  // Avoids potential deadlock.
113                         return func();
114                 
115                 return begin_invoke(std::forward<Func>(func), prioriy).get();
116         }
117
118         void yield(task_priority minimum_priority)
119         {
120                 if(!is_current())
121                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
122
123                 std::function<void ()> func;
124
125                 while (execution_queue_.try_pop(func, minimum_priority))
126                         func();
127         }
128
129         void set_capacity(function_queue_t::size_type capacity)
130         {
131                 execution_queue_.set_capacity(capacity);
132         }
133
134         function_queue_t::size_type capacity() const
135         {
136                 return execution_queue_.capacity();
137         }
138
139         bool is_full() const
140         {
141                 return execution_queue_.space_available() == 0;
142         }
143         
144         void clear()
145         {               
146                 std::function<void ()> func;
147                 while(execution_queue_.try_pop(func));
148         }
149                                 
150         void stop()
151         {
152                 invoke([this]
153                 {
154                         is_running_ = false;
155                 });
156         }
157
158         void wait()
159         {
160                 invoke([]{}, task_priority::lowest_priority);
161         }
162                 
163         function_queue_t::size_type size() const 
164         {
165                 return execution_queue_.size(); 
166         }
167                 
168         bool is_running() const
169         {
170                 return is_running_; 
171         }       
172
173         bool is_current() const
174         {
175                 return boost::this_thread::get_id() == thread_.get_id();
176         }
177                 
178 private:        
179
180         std::wstring print() const
181         {
182                 return L"executor[" + name_ + L"]";
183         }
184         
185         template<typename Func>
186         auto internal_begin_invoke(
187                 Func&& func,
188                 task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
189         {                                       
190                 typedef typename std::remove_reference<Func>::type      function_type;
191                 typedef decltype(func())                                                        result_type;
192                 typedef boost::packaged_task<result_type>                       task_type;
193                                                                 
194                 std::unique_ptr<task_type> task;
195
196                 // Use pointers since the boost thread library doesn't fully support move semantics.
197
198                 auto raw_func2 = new function_type(std::forward<Func>(func));
199                 try
200                 {
201                         task.reset(new task_type([raw_func2]() -> result_type
202                         {
203                                 std::unique_ptr<function_type> func2(raw_func2);
204                                 return (*func2)();
205                         }));
206                 }
207                 catch(...)
208                 {
209                         delete raw_func2;
210                         throw;
211                 }
212                 
213                 task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
214                 {
215                         try
216                         {
217                                 if(is_current())  // Avoids potential deadlock.
218                                         my_task();
219                         }
220                         catch(boost::task_already_started&){}
221                 }));
222                                 
223                 auto future = task->get_future();
224
225                 auto raw_task = task.release();
226                 auto function = [raw_task]
227                 {
228                         std::unique_ptr<task_type> task(raw_task);
229                         try
230                         {
231                                 (*task)();
232                         }
233                         catch(boost::task_already_started&){}
234                 };
235
236                 if (!execution_queue_.try_push(priority, function))
237                 {
238                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
239                         execution_queue_.push(priority, function);
240                 }
241
242                 return std::move(future);               
243         }
244
245         void run() // noexcept
246         {
247                 win32_exception::install_handler();             
248                 while(is_running_)
249                 {
250                         try
251                         {
252                                 std::function<void ()> func;
253                                 execution_queue_.pop(func);
254                                 func();
255                         }
256                         catch(...)
257                         {
258                                 CASPAR_LOG_CURRENT_EXCEPTION();
259                         }
260                 }
261         }       
262 };
263
264 }