]> git.sesse.net Git - casparcg/blob - common/executor.h
[executor] changed default to unbounded like in 2.0.7 and fixed a deadlock when capac...
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "os/general_protection_fault.h"
25 #include "except.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29 #include "future.h"
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36
37 #include <functional>
38 #include <future>
39
40 namespace caspar {
41 enum class task_priority
42 {
43         lowest_priority = 0,
44         lower_priority,
45         low_priority,
46         normal_priority,
47         high_priority,
48         higher_priority
49 };
50
51 class executor final
52 {
53         executor(const executor&);
54         executor& operator=(const executor&);
55
56         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
57
58         const std::wstring                                                                                      name_;
59         tbb::atomic<bool>                                                                                       is_running_;
60         boost::thread                                                                                           thread_;
61         function_queue_t                                                                                        execution_queue_;
62         tbb::atomic<bool>                                                                                       currently_in_task_;
63
64 public:
65         executor(const std::wstring& name)
66                 : name_(name)
67                 , execution_queue_(std::numeric_limits<int>::max(), std::vector<task_priority> {
68                         task_priority::lowest_priority,
69                         task_priority::lower_priority,
70                         task_priority::low_priority,
71                         task_priority::normal_priority,
72                         task_priority::high_priority,
73                         task_priority::higher_priority
74                 })
75         {
76                 is_running_ = true;
77                 currently_in_task_ = false;
78                 thread_ = boost::thread([this]{run();});
79         }
80
81         ~executor()
82         {
83                 CASPAR_LOG(debug) << L"Shutting down " << name_;
84
85                 try
86                 {
87                         if (is_running_)
88                                 internal_begin_invoke([=]
89                                 {
90                                         is_running_ = false;
91                                 }).wait();
92                 }
93                 catch(...)
94                 {
95                         CASPAR_LOG_CURRENT_EXCEPTION();
96                 }
97
98                 join();
99         }
100
101         void join()
102         {
103                 thread_.join();
104         }
105
106         template<typename Func>
107         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
108         {
109                 if(!is_running_)
110                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
111
112                 return internal_begin_invoke(std::forward<Func>(func), priority);
113         }
114
115         template<typename Func>
116         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
117         {
118                 if(is_current())  // Avoids potential deadlock.
119                         return func();
120
121                 return begin_invoke(std::forward<Func>(func), prioriy).get();
122         }
123
124         void yield(task_priority minimum_priority)
125         {
126                 if(!is_current())
127                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
128
129                 std::function<void ()> func;
130
131                 while (execution_queue_.try_pop(func, minimum_priority))
132                         func();
133         }
134
135         void set_capacity(function_queue_t::size_type capacity)
136         {
137                 execution_queue_.set_capacity(capacity);
138         }
139
140         function_queue_t::size_type capacity() const
141         {
142                 return execution_queue_.capacity();
143         }
144
145         bool is_full() const
146         {
147                 return execution_queue_.space_available() == 0;
148         }
149
150         void clear()
151         {
152                 std::function<void ()> func;
153                 while(execution_queue_.try_pop(func));
154         }
155
156         void stop()
157         {
158                 invoke([this]
159                 {
160                         is_running_ = false;
161                 });
162         }
163
164         void wait()
165         {
166                 invoke([]{}, task_priority::lowest_priority);
167         }
168
169         function_queue_t::size_type size() const
170         {
171                 return execution_queue_.size();
172         }
173
174         bool is_running() const
175         {
176                 return is_running_;
177         }
178
179         bool is_current() const
180         {
181                 return boost::this_thread::get_id() == thread_.get_id();
182         }
183
184         bool is_currently_in_task() const
185         {
186                 return currently_in_task_;
187         }
188
189         std::wstring name() const
190         {
191                 return name_;
192         }
193
194 private:
195
196         std::wstring print() const
197         {
198                 return L"executor[" + name_ + L"]";
199         }
200
201         template<typename Func>
202         auto internal_begin_invoke(
203                 Func&& func,
204                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
205         {
206                 typedef typename std::remove_reference<Func>::type      function_type;
207                 typedef decltype(func())                                                        result_type;
208                 typedef std::packaged_task<result_type()>                       task_type;
209
210                 std::shared_ptr<task_type> task;
211
212                 // Use pointers since the boost thread library doesn't fully support move semantics.
213
214                 auto raw_func2 = new function_type(std::forward<Func>(func));
215                 try
216                 {
217                         task.reset(new task_type([raw_func2]() -> result_type
218                         {
219                                 std::unique_ptr<function_type> func2(raw_func2);
220                                 return (*func2)();
221                         }));
222                 }
223                 catch(...)
224                 {
225                         delete raw_func2;
226                         throw;
227                 }
228
229                 auto future = task->get_future().share();
230                 auto function = [task]
231                 {
232                         try
233                         {
234                                 (*task)();
235                         }
236                         catch(std::future_error&){}
237                 };
238
239                 if (!execution_queue_.try_push(priority, function))
240                 {
241                         if (is_current())
242                                 CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock."));
243
244                         CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
245                         execution_queue_.push(priority, function);
246                 }
247
248                 return std::async(std::launch::deferred, [=]() mutable -> result_type
249                 {
250                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
251                         {
252                                 function();
253                         }
254
255                         try
256                         {
257                                 return future.get();
258                         }
259                         catch (const caspar_exception& e)
260                         {
261                                 if (!is_current()) // Add context information from this thread before rethrowing.
262                                 {
263                                         auto ctx_info = boost::get_error_info<context_info_t>(e);
264
265                                         if (ctx_info)
266                                                 e << context_info(get_context() + *ctx_info);
267                                         else
268                                                 e << context_info(get_context());
269                                 }
270
271                                 throw;
272                         }
273                 });
274         }
275
276         void run() // noexcept
277         {
278                 ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
279                 while(is_running_)
280                 {
281                         try
282                         {
283                                 std::function<void ()> func;
284                                 execution_queue_.pop(func);
285                                 currently_in_task_ = true;
286                                 func();
287                         }
288                         catch(...)
289                         {
290                                 CASPAR_LOG_CURRENT_EXCEPTION();
291                         }
292
293                         currently_in_task_ = false;
294                 }
295         }
296 };
297 }