]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
* Created custom decklink allocator for reducing memory footprint.
[casparcg] / common / concurrency / executor.h
1 /*
2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "../exception/win32_exception.h"
25 #include "../exception/exceptions.h"
26 #include "../utility/string.h"
27 #include "../utility/move_on_copy.h"
28 #include "../log/log.h"
29
30 #include <tbb/atomic.h>
31 #include <tbb/concurrent_queue.h>
32
33 #include <boost/thread.hpp>
34 #include <boost/optional.hpp>
35 #include <boost/noncopyable.hpp>
36
37 #include <functional>
38
39 namespace caspar {
40
41 namespace detail {
42
43 typedef struct tagTHREADNAME_INFO
44 {
45         DWORD dwType; // must be 0x1000
46         LPCSTR szName; // pointer to name (in user addr space)
47         DWORD dwThreadID; // thread ID (-1=caller thread)
48         DWORD dwFlags; // reserved for future use, must be zero
49 } THREADNAME_INFO;
50
51 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
52 {
53         THREADNAME_INFO info;
54         {
55                 info.dwType = 0x1000;
56                 info.szName = szThreadName;
57                 info.dwThreadID = dwThreadID;
58                 info.dwFlags = 0;
59         }
60         __try
61         {
62                 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );
63         }
64         __except (EXCEPTION_CONTINUE_EXECUTION){}       
65 }
66
67 }
68
69 enum task_priority
70 {
71         high_priority,
72         normal_priority,
73         priority_count
74 };
75
76 enum thread_priority
77 {
78         high_priority_class,
79         above_normal_priority_class,
80         normal_priority_class,
81         below_normal_priority_class
82 };
83
84 class executor : boost::noncopyable
85 {
86         const std::string name_;
87         boost::thread thread_;
88         tbb::atomic<bool> is_running_;
89         
90         typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;
91         function_queue execution_queue_[priority_count];
92                 
93         template<typename Func>
94         auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
95         {       
96                 typedef boost::packaged_task<decltype(func())> task_type;
97                                 
98                 auto task = task_type(std::forward<Func>(func));
99                 
100                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
101                 {
102                         try
103                         {
104                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.
105                                         my_task();
106                         }
107                         catch(boost::task_already_started&){}
108                 }));
109                                 
110                 return std::move(task);
111         }
112
113 public:
114                 
115         explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept
116         {
117                 is_running_ = true;
118                 thread_ = boost::thread([this]{run();});
119         }
120         
121         virtual ~executor() // noexcept
122         {
123                 stop();
124                 join();
125         }
126
127         void set_capacity(size_t capacity) // noexcept
128         {
129                 execution_queue_[normal_priority].set_capacity(capacity);
130         }
131
132         void set_priority_class(thread_priority p)
133         {
134                 begin_invoke([=]
135                 {
136                         if(p == high_priority_class)
137                                 SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);
138                         else if(p == above_normal_priority_class)
139                                 SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);
140                         else if(p == normal_priority_class)
141                                 SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);
142                         else if(p == below_normal_priority_class)
143                                 SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);
144                 });
145         }
146         
147         void clear()
148         {               
149                 std::function<void()> func;
150                 while(execution_queue_[normal_priority].try_pop(func));
151                 while(execution_queue_[high_priority].try_pop(func));
152         }
153                                 
154         void stop() // noexcept
155         {
156                 is_running_ = false;    
157                 execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.
158         }
159
160         void wait() // noexcept
161         {
162                 invoke([]{});
163         }
164
165         void join()
166         {
167                 if(boost::this_thread::get_id() != thread_.get_id())
168                         thread_.join();
169         }
170                                 
171         template<typename Func>
172         auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
173         {       
174                 if(!is_running_)
175                         BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
176
177                 // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
178                 auto task_adaptor = make_move_on_copy(create_task(func));
179
180                 auto future = task_adaptor.value.get_future();
181
182                 execution_queue_[priority].push([=]
183                 {
184                         try
185                         {
186                                 task_adaptor.value();
187                         }
188                         catch(boost::task_already_started&)
189                         {
190                         }
191                         catch(...)
192                         {
193                                 CASPAR_LOG_CURRENT_EXCEPTION();
194                         }
195                 });
196
197                 if(priority != normal_priority)
198                         execution_queue_[normal_priority].push(nullptr);
199                                         
200                 return std::move(future);               
201         }
202         
203         template<typename Func>
204         auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept
205         {
206                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.
207                         return func();
208                 
209                 return begin_invoke(std::forward<Func>(func), prioriy).get();
210         }
211         
212         void yield() // noexcept
213         {
214                 if(boost::this_thread::get_id() != thread_.get_id())  // Only yield when calling from execution thread.
215                         return;
216
217                 std::function<void()> func;
218                 while(execution_queue_[high_priority].try_pop(func))
219                 {
220                         if(func)
221                                 func();
222                 }       
223         }
224         
225         function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity();  }
226         function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size();  }
227         bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty();     }
228         bool is_running() const /*noexcept*/ { return is_running_; }    
229                 
230 private:
231         
232         void execute() // noexcept
233         {
234                 std::function<void()> func;
235                 execution_queue_[normal_priority].pop(func);    
236
237                 yield();
238
239                 if(func)
240                         func();
241         }
242
243         void execute_rest(task_priority priority) // noexcept
244         {
245                 std::function<void()> func;
246
247                 while (execution_queue_[priority].try_pop(func))
248                         if (func)
249                                 func();
250         }
251
252         void run() // noexcept
253         {
254                 win32_exception::ensure_handler_installed_for_thread(name_.c_str());
255
256                 while(is_running_)
257                 {
258                         try
259                         {
260                                 execute();
261                         }
262                         catch(...)
263                         {
264                                 CASPAR_LOG_CURRENT_EXCEPTION();
265                         }
266                 }
267
268                 execute_rest(high_priority);
269                 execute_rest(normal_priority);
270         }       
271 };
272
273 }