]> git.sesse.net Git - casparcg/blob - common/executor.h
Made the code more portable.
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "os/general_protection_fault.h"
25 #include "except.h"
26 #include "log.h"
27 #include "blocking_bounded_queue_adapter.h"
28 #include "blocking_priority_queue.h"
29 #include "future.h"
30
31 #include <tbb/atomic.h>
32 #include <tbb/concurrent_priority_queue.h>
33
34 #include <boost/thread.hpp>
35 #include <boost/optional.hpp>
36
37 #include <functional>
38 #include <future>
39
40 namespace caspar {
41                 
42 enum class task_priority
43 {
44         lowest_priority = 0,
45         lower_priority,
46         low_priority,
47         normal_priority,
48         high_priority,
49         higher_priority
50 };
51
52 class executor final
53 {       
54         executor(const executor&);
55         executor& operator=(const executor&);
56         
57         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
58         
59         const std::wstring                                                                                      name_;
60         tbb::atomic<bool>                                                                                       is_running_;
61         boost::thread                                                                                           thread_;        
62         function_queue_t                                                                                        execution_queue_;
63                 
64 public:         
65         executor(const std::wstring& name)
66                 : name_(name)
67                 , execution_queue_(512, std::vector<task_priority> {
68                         task_priority::lowest_priority,
69                         task_priority::lower_priority,
70                         task_priority::low_priority,
71                         task_priority::normal_priority,
72                         task_priority::high_priority,
73                         task_priority::higher_priority 
74                 })
75         {
76                 is_running_ = true;
77                 thread_ = boost::thread([this]{run();});
78         }
79         
80         ~executor()
81         {
82                 try
83                 {
84                         internal_begin_invoke([=]
85                         {
86                                 is_running_ = false;
87                         }).wait();
88                 }
89                 catch(...)
90                 {
91                         CASPAR_LOG_CURRENT_EXCEPTION();
92                 }
93                 
94                 thread_.join();
95         }
96
97         template<typename Func>
98         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
99         {       
100                 if(!is_running_)
101                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
102                                 
103                 return internal_begin_invoke(std::forward<Func>(func), priority);       
104         }
105         
106         template<typename Func>
107         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
108         {
109                 if(is_current())  // Avoids potential deadlock.
110                         return func();
111                 
112                 return begin_invoke(std::forward<Func>(func), prioriy).get();
113         }
114
115         void yield(task_priority minimum_priority)
116         {
117                 if(!is_current())
118                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
119
120                 std::function<void ()> func;
121
122                 while (execution_queue_.try_pop(func, minimum_priority))
123                         func();
124         }
125
126         void set_capacity(function_queue_t::size_type capacity)
127         {
128                 execution_queue_.set_capacity(capacity);
129         }
130
131         function_queue_t::size_type capacity() const
132         {
133                 return execution_queue_.capacity();
134         }
135
136         bool is_full() const
137         {
138                 return execution_queue_.space_available() == 0;
139         }
140         
141         void clear()
142         {               
143                 std::function<void ()> func;
144                 while(execution_queue_.try_pop(func));
145         }
146                                 
147         void stop()
148         {
149                 invoke([this]
150                 {
151                         is_running_ = false;
152                 });
153         }
154
155         void wait()
156         {
157                 invoke([]{}, task_priority::lowest_priority);
158         }
159                 
160         function_queue_t::size_type size() const 
161         {
162                 return execution_queue_.size(); 
163         }
164                 
165         bool is_running() const
166         {
167                 return is_running_; 
168         }       
169
170         bool is_current() const
171         {
172                 return boost::this_thread::get_id() == thread_.get_id();
173         }
174                 
175 private:        
176
177         std::wstring print() const
178         {
179                 return L"executor[" + name_ + L"]";
180         }
181         
182         template<typename Func>
183         auto internal_begin_invoke(
184                 Func&& func,
185                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
186         {                                       
187                 typedef typename std::remove_reference<Func>::type      function_type;
188                 typedef decltype(func())                                                        result_type;
189                 typedef std::packaged_task<result_type()>                       task_type;
190                                                                 
191                 std::shared_ptr<task_type> task;
192
193                 // Use pointers since the boost thread library doesn't fully support move semantics.
194
195                 auto raw_func2 = new function_type(std::forward<Func>(func));
196                 try
197                 {
198                         task.reset(new task_type([raw_func2]() -> result_type
199                         {
200                                 std::unique_ptr<function_type> func2(raw_func2);
201                                 return (*func2)();
202                         }));
203                 }
204                 catch(...)
205                 {
206                         delete raw_func2;
207                         throw;
208                 }
209                                 
210                 auto future = task->get_future().share();
211                 auto function = [task]
212                 {
213                         try
214                         {
215                                 (*task)();
216                         }
217                         catch(std::future_error&){}
218                 };
219
220                 if (!execution_queue_.try_push(priority, function))
221                 {
222                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
223                         execution_queue_.push(priority, function);
224                 }
225
226                 return std::async(std::launch::deferred, [=]() mutable -> result_type
227                 {
228                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
229                         {
230                                 function();
231                         }
232
233                         return future.get();
234                 });
235         }
236
237         void run() // noexcept
238         {
239                 ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
240                 while(is_running_)
241                 {
242                         try
243                         {
244                                 std::function<void ()> func;
245                                 execution_queue_.pop(func);
246                                 func();
247                         }
248                         catch(...)
249                         {
250                                 CASPAR_LOG_CURRENT_EXCEPTION();
251                         }
252                 }
253         }       
254 };
255
256 }