]> git.sesse.net Git - casparcg/blob - common/executor.h
* Merged streaming_consumer from 2.0
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "os/general_protection_fault.h"
25 #include "except.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29 #include "future.h"
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36
37 #include <functional>
38 #include <future>
39
40 namespace caspar {
41                 
42 enum class task_priority
43 {
44         lowest_priority = 0,
45         lower_priority,
46         low_priority,
47         normal_priority,
48         high_priority,
49         higher_priority
50 };
51
52 class executor final
53 {       
54         executor(const executor&);
55         executor& operator=(const executor&);
56         
57         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
58         
59         const std::wstring                                                                                      name_;
60         tbb::atomic<bool>                                                                                       is_running_;
61         boost::thread                                                                                           thread_;        
62         function_queue_t                                                                                        execution_queue_;
63                 
64 public:         
65         executor(const std::wstring& name)
66                 : name_(name)
67                 , execution_queue_(512, std::vector<task_priority> {
68                         task_priority::lowest_priority,
69                         task_priority::lower_priority,
70                         task_priority::low_priority,
71                         task_priority::normal_priority,
72                         task_priority::high_priority,
73                         task_priority::higher_priority 
74                 })
75         {
76                 is_running_ = true;
77                 thread_ = boost::thread([this]{run();});
78         }
79         
80         ~executor()
81         {
82                 try
83                 {
84                         internal_begin_invoke([=]
85                         {
86                                 is_running_ = false;
87                         }).wait();
88                 }
89                 catch(...)
90                 {
91                         CASPAR_LOG_CURRENT_EXCEPTION();
92                 }
93                 
94                 join();
95         }
96
97         void join()
98         {
99                 thread_.join();
100         }
101
102         template<typename Func>
103         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
104         {       
105                 if(!is_running_)
106                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
107                                 
108                 return internal_begin_invoke(std::forward<Func>(func), priority);       
109         }
110         
111         template<typename Func>
112         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
113         {
114                 if(is_current())  // Avoids potential deadlock.
115                         return func();
116                 
117                 return begin_invoke(std::forward<Func>(func), prioriy).get();
118         }
119
120         void yield(task_priority minimum_priority)
121         {
122                 if(!is_current())
123                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
124
125                 std::function<void ()> func;
126
127                 while (execution_queue_.try_pop(func, minimum_priority))
128                         func();
129         }
130
131         void set_capacity(function_queue_t::size_type capacity)
132         {
133                 execution_queue_.set_capacity(capacity);
134         }
135
136         function_queue_t::size_type capacity() const
137         {
138                 return execution_queue_.capacity();
139         }
140
141         bool is_full() const
142         {
143                 return execution_queue_.space_available() == 0;
144         }
145         
146         void clear()
147         {               
148                 std::function<void ()> func;
149                 while(execution_queue_.try_pop(func));
150         }
151                                 
152         void stop()
153         {
154                 invoke([this]
155                 {
156                         is_running_ = false;
157                 });
158         }
159
160         void wait()
161         {
162                 invoke([]{}, task_priority::lowest_priority);
163         }
164                 
165         function_queue_t::size_type size() const 
166         {
167                 return execution_queue_.size(); 
168         }
169                 
170         bool is_running() const
171         {
172                 return is_running_; 
173         }       
174
175         bool is_current() const
176         {
177                 return boost::this_thread::get_id() == thread_.get_id();
178         }
179                 
180 private:        
181
182         std::wstring print() const
183         {
184                 return L"executor[" + name_ + L"]";
185         }
186         
187         template<typename Func>
188         auto internal_begin_invoke(
189                 Func&& func,
190                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
191         {                                       
192                 typedef typename std::remove_reference<Func>::type      function_type;
193                 typedef decltype(func())                                                        result_type;
194                 typedef std::packaged_task<result_type()>                       task_type;
195                                                                 
196                 std::shared_ptr<task_type> task;
197
198                 // Use pointers since the boost thread library doesn't fully support move semantics.
199
200                 auto raw_func2 = new function_type(std::forward<Func>(func));
201                 try
202                 {
203                         task.reset(new task_type([raw_func2]() -> result_type
204                         {
205                                 std::unique_ptr<function_type> func2(raw_func2);
206                                 return (*func2)();
207                         }));
208                 }
209                 catch(...)
210                 {
211                         delete raw_func2;
212                         throw;
213                 }
214                                 
215                 auto future = task->get_future().share();
216                 auto function = [task]
217                 {
218                         try
219                         {
220                                 (*task)();
221                         }
222                         catch(std::future_error&){}
223                 };
224
225                 if (!execution_queue_.try_push(priority, function))
226                 {
227                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
228                         execution_queue_.push(priority, function);
229                 }
230
231                 return std::async(std::launch::deferred, [=]() mutable -> result_type
232                 {
233                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
234                         {
235                                 function();
236                         }
237
238                         return future.get();
239                 });
240         }
241
242         void run() // noexcept
243         {
244                 ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
245                 while(is_running_)
246                 {
247                         try
248                         {
249                                 std::function<void ()> func;
250                                 execution_queue_.pop(func);
251                                 func();
252                         }
253                         catch(...)
254                         {
255                                 CASPAR_LOG_CURRENT_EXCEPTION();
256                         }
257                 }
258         }       
259 };
260
261 }