2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
24 #include "../exception/win32_exception.h"
25 #include "../exception/exceptions.h"
26 #include "../utility/string.h"
27 #include "../utility/move_on_copy.h"
28 #include "../log/log.h"
30 #include <tbb/atomic.h>
31 #include <tbb/concurrent_queue.h>
33 #include <boost/thread.hpp>
34 #include <boost/optional.hpp>
35 #include <boost/noncopyable.hpp>
43 typedef struct tagTHREADNAME_INFO
45 DWORD dwType; // must be 0x1000
46 LPCSTR szName; // pointer to name (in user addr space)
47 DWORD dwThreadID; // thread ID (-1=caller thread)
48 DWORD dwFlags; // reserved for future use, must be zero
51 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
56 info.szName = szThreadName;
57 info.dwThreadID = dwThreadID;
62 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );
64 __except (EXCEPTION_CONTINUE_EXECUTION){}
79 above_normal_priority_class,
80 normal_priority_class,
81 below_normal_priority_class
84 class executor : boost::noncopyable
86 const std::string name_;
87 boost::thread thread_;
88 tbb::atomic<bool> is_running_;
90 typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;
91 function_queue execution_queue_[priority_count];
93 template<typename Func>
94 auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
96 typedef boost::packaged_task<decltype(func())> task_type;
98 auto task = task_type(std::forward<Func>(func));
100 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
104 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
107 catch(boost::task_already_started&){}
110 return std::move(task);
115 explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept
118 thread_ = boost::thread([this]{run();});
121 virtual ~executor() // noexcept
127 void set_capacity(size_t capacity) // noexcept
129 execution_queue_[normal_priority].set_capacity(capacity);
132 void set_priority_class(thread_priority p)
136 if(p == high_priority_class)
137 SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);
138 else if(p == above_normal_priority_class)
139 SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);
140 else if(p == normal_priority_class)
141 SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);
142 else if(p == below_normal_priority_class)
143 SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);
149 std::function<void()> func;
150 while(execution_queue_[normal_priority].try_pop(func));
151 while(execution_queue_[high_priority].try_pop(func));
154 void stop() // noexcept
157 execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.
160 void wait() // noexcept
167 if(boost::this_thread::get_id() != thread_.get_id())
171 template<typename Func>
172 auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
175 BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
177 // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
178 auto task_adaptor = make_move_on_copy(create_task(func));
180 auto future = task_adaptor.value.get_future();
182 execution_queue_[priority].push([=]
186 task_adaptor.value();
188 catch(boost::task_already_started&)
193 CASPAR_LOG_CURRENT_EXCEPTION();
197 if(priority != normal_priority)
198 execution_queue_[normal_priority].push(nullptr);
200 return std::move(future);
203 template<typename Func>
204 auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept
206 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
209 return begin_invoke(std::forward<Func>(func), prioriy).get();
212 void yield() // noexcept
214 if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread.
217 std::function<void()> func;
218 while(execution_queue_[high_priority].try_pop(func))
225 function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); }
226 function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); }
227 bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); }
228 bool is_running() const /*noexcept*/ { return is_running_; }
232 void execute() // noexcept
234 std::function<void()> func;
235 execution_queue_[normal_priority].pop(func);
243 void execute_rest(task_priority priority) // noexcept
245 std::function<void()> func;
247 while (execution_queue_[priority].try_pop(func))
252 void run() // noexcept
254 win32_exception::ensure_handler_installed_for_thread(name_.c_str());
264 CASPAR_LOG_CURRENT_EXCEPTION();
268 execute_rest(high_priority);
269 execute_rest(normal_priority);