2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
25 #include "enum_class.h"
27 #include "blocking_bounded_queue_adapter.h"
29 #include <tbb/atomic.h>
30 #include <tbb/concurrent_priority_queue.h>
32 #include <boost/thread.hpp>
33 #include <boost/optional.hpp>
39 struct task_priority_def
51 typedef enum_class<task_priority_def> task_priority;
55 struct priority_function
58 std::function<void()> func;
65 priority_function(int priority, F&& func)
67 , func(std::forward<F>(func))
76 bool operator<(const priority_function& other) const
78 return priority < other.priority;
82 executor(const executor&);
83 executor& operator=(const executor&);
85 typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>> function_queue_t;
87 const std::wstring name_;
88 tbb::atomic<bool> is_running_;
89 boost::thread thread_;
90 function_queue_t execution_queue_;
93 executor(const std::wstring& name)
95 , execution_queue_(512)
98 thread_ = boost::thread([this]{run();});
105 internal_begin_invoke([=]
112 CASPAR_LOG_CURRENT_EXCEPTION();
118 template<typename Func>
119 auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())>
122 BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
124 // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
125 return internal_begin_invoke(std::forward<Func>(func), true, priority);
128 template<typename Func>
129 auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
132 CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
134 return internal_begin_invoke(std::forward<Func>(func), false, priority);
137 template<typename Func>
138 auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
140 if(is_current()) // Avoids potential deadlock.
143 return begin_invoke(std::forward<Func>(func), prioriy).get();
149 CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));
151 priority_function func;
152 if(execution_queue_.try_pop(func))
156 void set_capacity(function_queue_t::size_type capacity)
158 execution_queue_.set_capacity(capacity);
161 function_queue_t::size_type capacity() const
163 return execution_queue_.capacity();
168 priority_function func;
169 while(execution_queue_.try_pop(func));
182 invoke([]{}, task_priority::lowest_priority);
185 function_queue_t::size_type size() const
187 return execution_queue_.size();
190 bool is_running() const
195 bool is_current() const
197 return boost::this_thread::get_id() == thread_.get_id();
202 std::wstring print() const
204 return L"executor[" + name_ + L"]";
207 template<typename Func>
208 auto internal_begin_invoke(
211 task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
213 typedef typename std::remove_reference<Func>::type function_type;
214 typedef decltype(func()) result_type;
215 typedef boost::packaged_task<result_type> task_type;
217 std::unique_ptr<task_type> task;
219 // Use pointers since the boost thread library doesn't fully support move semantics.
221 auto raw_func2 = new function_type(std::forward<Func>(func));
224 task.reset(new task_type([raw_func2]() -> result_type
226 std::unique_ptr<function_type> func2(raw_func2);
236 task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
240 if(is_current()) // Avoids potential deadlock.
243 catch(boost::task_already_started&){}
246 auto future = task->get_future();
248 auto raw_task = task.release();
249 priority_function prio_func(priority.value(), [raw_task]
251 std::unique_ptr<task_type> task(raw_task);
256 catch(boost::task_already_started&){}
259 if (!execution_queue_.try_push(prio_func))
265 return boost::unique_future<decltype(func())>();
269 CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
270 execution_queue_.push(prio_func);
274 return std::move(future);
277 void run() // noexcept
279 win32_exception::install_handler();
284 priority_function func;
285 execution_queue_.pop(func);
290 CASPAR_LOG_CURRENT_EXCEPTION();