2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG (www.casparcg.com).
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
19 * Author: Robert Nagy, ronag89@gmail.com
\r
25 #include "enum_class.h"
\r
28 #include <tbb/atomic.h>
\r
29 #include <tbb/concurrent_priority_queue.h>
\r
30 #include <tbb/concurrent_queue.h>
\r
32 #include <boost/thread.hpp>
\r
34 #include <functional>
\r
38 struct task_priority_def
\r
49 typedef enum_class<task_priority_def> task_priority;
\r
53 struct priority_function
\r
56 std::function<void()> func;
\r
62 template<typename F>
\r
63 priority_function(int priority, F&& func)
\r
64 : priority(priority)
\r
65 , func(std::forward<F>(func))
\r
74 bool operator<(const priority_function& other) const
\r
76 return priority < other.priority;
\r
80 executor(const executor&);
\r
81 executor& operator=(const executor&);
\r
83 typedef tbb::concurrent_priority_queue<priority_function> function_queue_t;
\r
85 tbb::atomic<bool> is_running_;
\r
86 boost::thread thread_;
\r
87 function_queue_t execution_queue_;
\r
88 tbb::concurrent_bounded_queue<int> semaphore_;
\r
91 executor(const std::wstring& name) // noexcept
\r
93 name; // TODO: Use to set thread name.
\r
95 thread_ = boost::thread([this]{run();});
\r
98 virtual ~executor() // noexcept
\r
104 template<typename Func>
\r
105 auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
\r
108 BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
\r
110 typedef typename std::remove_reference<Func>::type function_type;
\r
111 typedef decltype(func()) result_type;
\r
112 typedef boost::packaged_task<result_type> task_type;
\r
114 std::unique_ptr<task_type> task;
\r
116 // Use pointers since the boost thread library doesn't fully support move semantics.
\r
118 auto raw_func2 = new function_type(std::forward<Func>(func));
\r
121 task.reset(new task_type([raw_func2]() -> result_type
\r
123 std::unique_ptr<function_type> func2(raw_func2);
\r
133 task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
\r
137 if(is_current()) // Avoids potential deadlock.
\r
140 catch(boost::task_already_started&){}
\r
143 auto future = task->get_future();
\r
145 auto raw_task = task.release();
\r
146 priority_function prio_func(priority.value(), [raw_task]
\r
148 std::unique_ptr<task_type> task(raw_task);
\r
153 catch(boost::task_already_started&){}
\r
156 execution_queue_.push(prio_func);
\r
157 semaphore_.push(0);
\r
159 return std::move(future);
\r
162 template<typename Func>
\r
163 auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
\r
165 if(is_current()) // Avoids potential deadlock.
\r
168 return begin_invoke(std::forward<Func>(func), prioriy).get();
\r
171 void yield() // noexcept
\r
174 BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context."));
\r
177 semaphore_.pop(dummy);
\r
179 priority_function func;
\r
180 if(execution_queue_.try_pop(func))
\r
184 void set_capacity(std::size_t capacity) // noexcept
\r
186 semaphore_.set_capacity(capacity);
\r
189 std::size_t capacity() const
\r
191 return semaphore_.capacity();
\r
196 priority_function func;
\r
197 while(execution_queue_.try_pop(func));
\r
204 is_running_ = false;
\r
213 function_queue_t::size_type size() const
\r
215 return execution_queue_.size();
\r
218 bool is_running() const
\r
220 return is_running_;
\r
223 bool is_current() const
\r
225 return boost::this_thread::get_id() == thread_.get_id();
\r
230 void run() // noexcept
\r
232 win32_exception::install_handler();
\r
241 CASPAR_LOG_CURRENT_EXCEPTION();
\r