]> git.sesse.net Git - casparcg/blob - common/executor.h
805c1bc9dbe6d080fbd7a418d0cd29dd34fb65a6
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "os/general_protection_fault.h"
25 #include "except.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29 #include "future.h"
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36
37 #include <functional>
38 #include <future>
39
40 namespace caspar {
41                 
42 enum class task_priority
43 {
44         lowest_priority = 0,
45         lower_priority,
46         low_priority,
47         normal_priority,
48         high_priority,
49         higher_priority
50 };
51
52 class executor final
53 {       
54         executor(const executor&);
55         executor& operator=(const executor&);
56         
57         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
58         
59         const std::wstring                                                                                      name_;
60         tbb::atomic<bool>                                                                                       is_running_;
61         boost::thread                                                                                           thread_;        
62         function_queue_t                                                                                        execution_queue_;
63         tbb::atomic<bool>                                                                                       currently_in_task_;
64
65 public:         
66         executor(const std::wstring& name)
67                 : name_(name)
68                 , execution_queue_(512, std::vector<task_priority> {
69                         task_priority::lowest_priority,
70                         task_priority::lower_priority,
71                         task_priority::low_priority,
72                         task_priority::normal_priority,
73                         task_priority::high_priority,
74                         task_priority::higher_priority 
75                 })
76         {
77                 is_running_ = true;
78                 currently_in_task_ = false;
79                 thread_ = boost::thread([this]{run();});
80         }
81         
82         ~executor()
83         {
84                 CASPAR_LOG(debug) << L"Shutting down " << name_;
85
86                 try
87                 {
88                         if (is_running_)
89                                 internal_begin_invoke([=]
90                                 {
91                                         is_running_ = false;
92                                 }).wait();
93                 }
94                 catch(...)
95                 {
96                         CASPAR_LOG_CURRENT_EXCEPTION();
97                 }
98                 
99                 join();
100         }
101
102         void join()
103         {
104                 thread_.join();
105         }
106
107         template<typename Func>
108         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
109         {       
110                 if(!is_running_)
111                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
112                                 
113                 return internal_begin_invoke(std::forward<Func>(func), priority);       
114         }
115         
116         template<typename Func>
117         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
118         {
119                 if(is_current())  // Avoids potential deadlock.
120                         return func();
121                 
122                 return begin_invoke(std::forward<Func>(func), prioriy).get();
123         }
124
125         void yield(task_priority minimum_priority)
126         {
127                 if(!is_current())
128                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
129
130                 std::function<void ()> func;
131
132                 while (execution_queue_.try_pop(func, minimum_priority))
133                         func();
134         }
135
136         void set_capacity(function_queue_t::size_type capacity)
137         {
138                 execution_queue_.set_capacity(capacity);
139         }
140
141         function_queue_t::size_type capacity() const
142         {
143                 return execution_queue_.capacity();
144         }
145
146         bool is_full() const
147         {
148                 return execution_queue_.space_available() == 0;
149         }
150         
151         void clear()
152         {               
153                 std::function<void ()> func;
154                 while(execution_queue_.try_pop(func));
155         }
156                                 
157         void stop()
158         {
159                 invoke([this]
160                 {
161                         is_running_ = false;
162                 });
163         }
164
165         void wait()
166         {
167                 invoke([]{}, task_priority::lowest_priority);
168         }
169                 
170         function_queue_t::size_type size() const 
171         {
172                 return execution_queue_.size(); 
173         }
174
175         bool is_running() const
176         {
177                 return is_running_; 
178         }       
179
180         bool is_current() const
181         {
182                 return boost::this_thread::get_id() == thread_.get_id();
183         }
184
185         bool is_currently_in_task() const
186         {
187                 return currently_in_task_;
188         }
189
190         std::wstring name() const
191         {
192                 return name_;
193         }
194                 
195 private:        
196
197         std::wstring print() const
198         {
199                 return L"executor[" + name_ + L"]";
200         }
201         
202         template<typename Func>
203         auto internal_begin_invoke(
204                 Func&& func,
205                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
206         {                                       
207                 typedef typename std::remove_reference<Func>::type      function_type;
208                 typedef decltype(func())                                                        result_type;
209                 typedef std::packaged_task<result_type()>                       task_type;
210                                                                 
211                 std::shared_ptr<task_type> task;
212
213                 // Use pointers since the boost thread library doesn't fully support move semantics.
214
215                 auto raw_func2 = new function_type(std::forward<Func>(func));
216                 try
217                 {
218                         task.reset(new task_type([raw_func2]() -> result_type
219                         {
220                                 std::unique_ptr<function_type> func2(raw_func2);
221                                 return (*func2)();
222                         }));
223                 }
224                 catch(...)
225                 {
226                         delete raw_func2;
227                         throw;
228                 }
229                                 
230                 auto future = task->get_future().share();
231                 auto function = [task]
232                 {
233                         try
234                         {
235                                 (*task)();
236                         }
237                         catch(std::future_error&){}
238                 };
239
240                 if (!execution_queue_.try_push(priority, function))
241                 {
242                         CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
243                         execution_queue_.push(priority, function);
244                 }
245
246                 return std::async(std::launch::deferred, [=]() mutable -> result_type
247                 {
248                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
249                         {
250                                 function();
251                         }
252
253                         try
254                         {
255                                 return future.get();
256                         }
257                         catch (const caspar_exception& e)
258                         {
259                                 if (!is_current()) // Add context information from this thread before rethrowing.
260                                         e << context_info(get_context() + *boost::get_error_info<context_info_t>(e));
261
262                                 throw;
263                         }
264                 });
265         }
266
267         void run() // noexcept
268         {
269                 ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
270                 while(is_running_)
271                 {
272                         try
273                         {
274                                 std::function<void ()> func;
275                                 execution_queue_.pop(func);
276                                 currently_in_task_ = true;
277                                 func();
278                         }
279                         catch(...)
280                         {
281                                 CASPAR_LOG_CURRENT_EXCEPTION();
282                         }
283
284                         currently_in_task_ = false;
285                 }
286         }       
287 };
288
289 }