]> git.sesse.net Git - casparcg/blob - common/executor.h
* Upgraded to Visual Studio 2013
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "except.h"
25 #include "enum_class.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29 #include "future.h"
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36 #include <boost/assign/list_of.hpp>
37
38 #include <functional>
39 #include <future>
40
41 namespace caspar {
42                 
43 struct task_priority_def
44 {
45         enum type
46         {
47                 lowest_priority = 0,
48                 lower_priority,
49                 low_priority,
50                 normal_priority,
51                 high_priority,
52                 higher_priority
53         };
54 };
55 typedef enum_class<task_priority_def> task_priority;
56
57 class executor /* final */
58 {       
59         executor(const executor&);
60         executor& operator=(const executor&);
61         
62         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
63         
64         const std::wstring                                                                                      name_;
65         tbb::atomic<bool>                                                                                       is_running_;
66         boost::thread                                                                                           thread_;        
67         function_queue_t                                                                                        execution_queue_;
68                 
69 public:         
70         executor(const std::wstring& name)
71                 : name_(name)
72                 , execution_queue_(512, boost::assign::list_of
73                                 (task_priority::lowest_priority)
74                                 (task_priority::lower_priority)
75                                 (task_priority::low_priority)
76                                 (task_priority::normal_priority)
77                                 (task_priority::high_priority)
78                                 (task_priority::higher_priority))
79         {
80                 is_running_ = true;
81                 thread_ = boost::thread([this]{run();});
82         }
83         
84         ~executor()
85         {
86                 try
87                 {
88                         internal_begin_invoke([=]
89                         {
90                                 is_running_ = false;
91                         }).wait();
92                 }
93                 catch(...)
94                 {
95                         CASPAR_LOG_CURRENT_EXCEPTION();
96                 }
97                 
98                 thread_.join();
99         }
100
101         template<typename Func>
102         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
103         {       
104                 if(!is_running_)
105                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
106                                 
107                 return internal_begin_invoke(std::forward<Func>(func), priority);       
108         }
109         
110         template<typename Func>
111         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
112         {
113                 if(is_current())  // Avoids potential deadlock.
114                         return func();
115                 
116                 return begin_invoke(std::forward<Func>(func), prioriy).get();
117         }
118
119         void yield(task_priority minimum_priority)
120         {
121                 if(!is_current())
122                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
123
124                 std::function<void ()> func;
125
126                 while (execution_queue_.try_pop(func, minimum_priority))
127                         func();
128         }
129
130         void set_capacity(function_queue_t::size_type capacity)
131         {
132                 execution_queue_.set_capacity(capacity);
133         }
134
135         function_queue_t::size_type capacity() const
136         {
137                 return execution_queue_.capacity();
138         }
139
140         bool is_full() const
141         {
142                 return execution_queue_.space_available() == 0;
143         }
144         
145         void clear()
146         {               
147                 std::function<void ()> func;
148                 while(execution_queue_.try_pop(func));
149         }
150                                 
151         void stop()
152         {
153                 invoke([this]
154                 {
155                         is_running_ = false;
156                 });
157         }
158
159         void wait()
160         {
161                 invoke([]{}, task_priority::lowest_priority);
162         }
163                 
164         function_queue_t::size_type size() const 
165         {
166                 return execution_queue_.size(); 
167         }
168                 
169         bool is_running() const
170         {
171                 return is_running_; 
172         }       
173
174         bool is_current() const
175         {
176                 return boost::this_thread::get_id() == thread_.get_id();
177         }
178                 
179 private:        
180
181         std::wstring print() const
182         {
183                 return L"executor[" + name_ + L"]";
184         }
185         
186         template<typename Func>
187         auto internal_begin_invoke(
188                 Func&& func,
189                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
190         {                                       
191                 typedef typename std::remove_reference<Func>::type      function_type;
192                 typedef decltype(func())                                                        result_type;
193                 typedef std::packaged_task<result_type()>                       task_type;
194                                                                 
195                 std::shared_ptr<task_type> task;
196
197                 // Use pointers since the boost thread library doesn't fully support move semantics.
198
199                 auto raw_func2 = new function_type(std::forward<Func>(func));
200                 try
201                 {
202                         task.reset(new task_type([raw_func2]() -> result_type
203                         {
204                                 std::unique_ptr<function_type> func2(raw_func2);
205                                 return (*func2)();
206                         }));
207                 }
208                 catch(...)
209                 {
210                         delete raw_func2;
211                         throw;
212                 }
213                                 
214                 auto future = task->get_future().share();
215                 auto function = [task]
216                 {
217                         try
218                         {
219                                 (*task)();
220                         }
221                         catch(std::future_error&){}
222                 };
223
224                 if (!execution_queue_.try_push(priority, function))
225                 {
226                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
227                         execution_queue_.push(priority, function);
228                 }
229
230                 return std::async(std::launch::deferred, [=]() mutable -> result_type
231                 {
232                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
233                         {
234                                 function();
235                         }
236
237                         return future.get();
238                 });
239         }
240
241         void run() // noexcept
242         {
243                 win32_exception::install_handler();             
244                 while(is_running_)
245                 {
246                         try
247                         {
248                                 std::function<void ()> func;
249                                 execution_queue_.pop(func);
250                                 func();
251                         }
252                         catch(...)
253                         {
254                                 CASPAR_LOG_CURRENT_EXCEPTION();
255                         }
256                 }
257         }       
258 };
259
260 }