]> git.sesse.net Git - casparcg/blob - common/executor.h
- Removed need of non-deterministic sleeps during server shutdown.
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "os/general_protection_fault.h"
25 #include "except.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29 #include "future.h"
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36
37 #include <functional>
38 #include <future>
39
40 namespace caspar {
41                 
42 enum class task_priority
43 {
44         lowest_priority = 0,
45         lower_priority,
46         low_priority,
47         normal_priority,
48         high_priority,
49         higher_priority
50 };
51
52 class executor final
53 {       
54         executor(const executor&);
55         executor& operator=(const executor&);
56         
57         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
58         
59         const std::wstring                                                                                      name_;
60         tbb::atomic<bool>                                                                                       is_running_;
61         boost::thread                                                                                           thread_;        
62         function_queue_t                                                                                        execution_queue_;
63                 
64 public:         
65         executor(const std::wstring& name)
66                 : name_(name)
67                 , execution_queue_(512, std::vector<task_priority> {
68                         task_priority::lowest_priority,
69                         task_priority::lower_priority,
70                         task_priority::low_priority,
71                         task_priority::normal_priority,
72                         task_priority::high_priority,
73                         task_priority::higher_priority 
74                 })
75         {
76                 is_running_ = true;
77                 thread_ = boost::thread([this]{run();});
78         }
79         
80         ~executor()
81         {
82                 CASPAR_LOG(trace) << L"Shutting down " << name_;
83
84                 try
85                 {
86                         if (is_running_)
87                                 internal_begin_invoke([=]
88                                 {
89                                         is_running_ = false;
90                                 }).wait();
91                 }
92                 catch(...)
93                 {
94                         CASPAR_LOG_CURRENT_EXCEPTION();
95                 }
96                 
97                 join();
98         }
99
100         void join()
101         {
102                 thread_.join();
103         }
104
105         template<typename Func>
106         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
107         {       
108                 if(!is_running_)
109                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
110                                 
111                 return internal_begin_invoke(std::forward<Func>(func), priority);       
112         }
113         
114         template<typename Func>
115         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
116         {
117                 if(is_current())  // Avoids potential deadlock.
118                         return func();
119                 
120                 return begin_invoke(std::forward<Func>(func), prioriy).get();
121         }
122
123         void yield(task_priority minimum_priority)
124         {
125                 if(!is_current())
126                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
127
128                 std::function<void ()> func;
129
130                 while (execution_queue_.try_pop(func, minimum_priority))
131                         func();
132         }
133
134         void set_capacity(function_queue_t::size_type capacity)
135         {
136                 execution_queue_.set_capacity(capacity);
137         }
138
139         function_queue_t::size_type capacity() const
140         {
141                 return execution_queue_.capacity();
142         }
143
144         bool is_full() const
145         {
146                 return execution_queue_.space_available() == 0;
147         }
148         
149         void clear()
150         {               
151                 std::function<void ()> func;
152                 while(execution_queue_.try_pop(func));
153         }
154                                 
155         void stop()
156         {
157                 invoke([this]
158                 {
159                         is_running_ = false;
160                 });
161         }
162
163         void wait()
164         {
165                 invoke([]{}, task_priority::lowest_priority);
166         }
167                 
168         function_queue_t::size_type size() const 
169         {
170                 return execution_queue_.size(); 
171         }
172                 
173         bool is_running() const
174         {
175                 return is_running_; 
176         }       
177
178         bool is_current() const
179         {
180                 return boost::this_thread::get_id() == thread_.get_id();
181         }
182
183         std::wstring name() const
184         {
185                 return name_;
186         }
187                 
188 private:        
189
190         std::wstring print() const
191         {
192                 return L"executor[" + name_ + L"]";
193         }
194         
195         template<typename Func>
196         auto internal_begin_invoke(
197                 Func&& func,
198                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
199         {                                       
200                 typedef typename std::remove_reference<Func>::type      function_type;
201                 typedef decltype(func())                                                        result_type;
202                 typedef std::packaged_task<result_type()>                       task_type;
203                                                                 
204                 std::shared_ptr<task_type> task;
205
206                 // Use pointers since the boost thread library doesn't fully support move semantics.
207
208                 auto raw_func2 = new function_type(std::forward<Func>(func));
209                 try
210                 {
211                         task.reset(new task_type([raw_func2]() -> result_type
212                         {
213                                 std::unique_ptr<function_type> func2(raw_func2);
214                                 return (*func2)();
215                         }));
216                 }
217                 catch(...)
218                 {
219                         delete raw_func2;
220                         throw;
221                 }
222                                 
223                 auto future = task->get_future().share();
224                 auto function = [task]
225                 {
226                         try
227                         {
228                                 (*task)();
229                         }
230                         catch(std::future_error&){}
231                 };
232
233                 if (!execution_queue_.try_push(priority, function))
234                 {
235                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
236                         execution_queue_.push(priority, function);
237                 }
238
239                 return std::async(std::launch::deferred, [=]() mutable -> result_type
240                 {
241                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
242                         {
243                                 function();
244                         }
245
246                         return future.get();
247                 });
248         }
249
250         void run() // noexcept
251         {
252                 ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
253                 while(is_running_)
254                 {
255                         try
256                         {
257                                 std::function<void ()> func;
258                                 execution_queue_.pop(func);
259                                 func();
260                         }
261                         catch(...)
262                         {
263                                 CASPAR_LOG_CURRENT_EXCEPTION();
264                         }
265                 }
266         }       
267 };
268
269 }