2 * copyright (c) 2010 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG.
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
22 #include "../exception/win32_exception.h"
\r
23 #include "../utility/string.h"
\r
24 #include "../utility/move_on_copy.h"
\r
25 #include "../log/log.h"
\r
27 #include <tbb/atomic.h>
\r
28 #include <tbb/concurrent_queue.h>
\r
30 #include <boost/thread.hpp>
\r
31 #include <boost/noncopyable.hpp>
\r
33 #include <functional>
\r
39 typedef struct tagTHREADNAME_INFO
\r
41 DWORD dwType; // must be 0x1000
\r
42 LPCSTR szName; // pointer to name (in user addr space)
\r
43 DWORD dwThreadID; // thread ID (-1=caller thread)
\r
44 DWORD dwFlags; // reserved for future use, must be zero
\r
47 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
\r
49 THREADNAME_INFO info;
\r
51 info.dwType = 0x1000;
\r
52 info.szName = szThreadName;
\r
53 info.dwThreadID = dwThreadID;
\r
58 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );
\r
60 __except (EXCEPTION_CONTINUE_EXECUTION){}
\r
72 enum thread_priority
\r
74 high_priority_class,
\r
75 above_normal_priority_class,
\r
76 normal_priority_class,
\r
77 below_normal_priority_class
\r
80 class executor : boost::noncopyable
\r
82 const std::string name_;
\r
83 boost::thread thread_;
\r
84 tbb::atomic<bool> is_running_;
\r
86 typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;
\r
87 function_queue execution_queue_[priority_count];
\r
89 template<typename Func>
\r
90 auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
\r
92 typedef boost::packaged_task<decltype(func())> task_type;
\r
94 auto task = task_type(std::forward<Func>(func));
\r
96 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
\r
100 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
103 catch(boost::task_already_started&){}
\r
106 return std::move(task);
\r
111 explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept
\r
113 thread_ = boost::thread([this]{run();});
\r
114 is_running_ = true;
\r
117 virtual ~executor() // noexcept
\r
120 execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.
\r
124 void set_capacity(size_t capacity) // noexcept
\r
126 execution_queue_[normal_priority].set_capacity(capacity);
\r
129 void set_priority_class(thread_priority p)
\r
133 if(p == high_priority_class)
\r
134 SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);
\r
135 if(p == above_normal_priority_class)
\r
136 SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);
\r
137 else if(p == normal_priority_class)
\r
138 SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);
\r
139 else if(p == below_normal_priority_class)
\r
140 SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);
\r
144 void stop() // noexcept
\r
146 is_running_ = false;
\r
149 void wait() // noexcept
\r
156 if(boost::this_thread::get_id() != thread_.get_id())
\r
160 template<typename Func>
\r
161 auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
\r
163 // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
\r
164 auto task_adaptor = make_move_on_copy(create_task(func));
\r
166 auto future = task_adaptor.value.get_future();
\r
168 execution_queue_[priority].push([=]
\r
170 try{task_adaptor.value();}
\r
171 catch(boost::task_already_started&){}
\r
172 catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}
\r
175 if(priority != normal_priority)
\r
176 execution_queue_[normal_priority].push(nullptr);
\r
178 return std::move(future);
\r
181 template<typename Func>
\r
182 auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
\r
184 // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
\r
185 auto task_adaptor = make_move_on_copy(create_task(func));
\r
187 auto future = task_adaptor.value.get_future();
\r
189 if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr))
\r
191 execution_queue_[priority].try_push([=]
\r
193 try{task_adaptor.value();}
\r
194 catch(boost::task_already_started&){}
\r
195 catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}
\r
199 return std::move(future);
\r
202 template<typename Func>
\r
203 auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept
\r
205 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
208 return begin_invoke(std::forward<Func>(func), prioriy).get();
\r
211 template<typename Func>
\r
212 auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept
\r
214 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
217 return try_begin_invoke(std::forward<Func>(func), prioriy).get();
\r
220 void yield() // noexcept
\r
222 if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread.
\r
225 std::function<void()> func;
\r
226 while(execution_queue_[high_priority].try_pop(func))
\r
233 function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); }
\r
234 function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); }
\r
235 bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); }
\r
236 bool is_running() const /*noexcept*/ { return is_running_; }
\r
240 void execute() // noexcept
\r
242 std::function<void()> func;
\r
243 execution_queue_[normal_priority].pop(func);
\r
251 void run() // noexcept
\r
253 win32_exception::install_handler();
\r
254 detail::SetThreadName(GetCurrentThreadId(), name_.c_str());
\r
263 CASPAR_LOG_CURRENT_EXCEPTION();
\r