2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG (www.casparcg.com).
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
19 * Author: Robert Nagy, ronag89@gmail.com
\r
24 #include "../except.h"
\r
25 #include "../enum_class.h"
\r
28 #include <tbb/atomic.h>
\r
29 #include <tbb/concurrent_queue.h>
\r
31 #include <boost/thread.hpp>
\r
33 #include <functional>
\r
39 template<typename T>
\r
42 move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}
\r
43 move_on_copy(T&& value) : value(std::move(value)){}
\r
47 template<typename T>
\r
48 move_on_copy<T> make_move_on_copy(T&& value)
\r
50 return move_on_copy<T>(std::move(value));
\r
55 struct task_priority_def
\r
64 typedef enum_class<task_priority_def> task_priority;
\r
68 executor(const executor&);
\r
69 executor& operator=(const executor&);
\r
71 tbb::atomic<bool> is_running_;
\r
72 boost::thread thread_;
\r
74 typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;
\r
75 function_queue execution_queue_[task_priority::priority_count];
\r
77 template<typename Func>
\r
78 auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
\r
80 typedef boost::packaged_task<decltype(func())> task_type;
\r
82 auto task = task_type(std::forward<Func>(func));
\r
84 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
\r
88 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
91 catch(boost::task_already_started&){}
\r
94 return std::move(task);
\r
98 executor(const std::wstring& name) // noexcept
\r
100 name; // TODO: Use to set thread name.
\r
101 is_running_ = true;
\r
102 thread_ = boost::thread([this]{run();});
\r
105 virtual ~executor() // noexcept
\r
111 void set_capacity(size_t capacity) // noexcept
\r
113 execution_queue_[task_priority::normal_priority].set_capacity(capacity);
\r
118 std::function<void()> func;
\r
119 while(execution_queue_[task_priority::normal_priority].try_pop(func));
\r
120 while(execution_queue_[task_priority::high_priority].try_pop(func));
\r
123 void stop() // noexcept
\r
125 is_running_ = false;
\r
126 execution_queue_[task_priority::normal_priority].try_push([]{}); // Wake the execution thread.
\r
129 void wait() // noexcept
\r
136 if(boost::this_thread::get_id() == thread_.get_id())
\r
137 BOOST_THROW_EXCEPTION(invalid_operation());
\r
142 template<typename Func>
\r
143 auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
\r
146 BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
\r
148 // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
\r
149 auto task_adaptor = detail::make_move_on_copy(create_task(func));
\r
151 auto future = task_adaptor.value.get_future();
\r
153 execution_queue_[priority.value()].push([=]
\r
157 task_adaptor.value();
\r
159 catch(boost::task_already_started&)
\r
164 CASPAR_LOG_CURRENT_EXCEPTION();
\r
168 if(priority != task_priority::normal_priority)
\r
169 execution_queue_[task_priority::normal_priority].push(nullptr);
\r
171 return std::move(future);
\r
174 template<typename Func>
\r
175 auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
\r
177 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
180 return begin_invoke(std::forward<Func>(func), prioriy).get();
\r
183 void yield() // noexcept
\r
185 if(boost::this_thread::get_id() != thread_.get_id())
\r
186 BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context."));
\r
188 std::function<void()> func;
\r
189 execution_queue_[task_priority::normal_priority].pop(func);
\r
191 std::function<void()> func2;
\r
192 while(execution_queue_[task_priority::high_priority].try_pop(func2))
\r
202 function_queue::size_type size() const /*noexcept*/
\r
204 return execution_queue_[task_priority::normal_priority].size() + execution_queue_[task_priority::high_priority].size();
\r
207 bool is_running() const /*noexcept*/ { return is_running_; }
\r
211 void run() // noexcept
\r
213 win32_exception::install_handler();
\r
222 CASPAR_LOG_CURRENT_EXCEPTION();
\r