2 * copyright (c) 2010 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG.
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
22 #include "../exception/win32_exception.h"
\r
23 #include "../utility/assert.h"
\r
24 #include "../log/log.h"
\r
26 #include <tbb/atomic.h>
\r
27 #include <tbb/concurrent_queue.h>
\r
29 #include <boost/thread.hpp>
\r
30 #include <boost/noncopyable.hpp>
\r
32 #include <functional>
\r
38 typedef struct tagTHREADNAME_INFO
\r
40 DWORD dwType; // must be 0x1000
\r
41 LPCSTR szName; // pointer to name (in user addr space)
\r
42 DWORD dwThreadID; // thread ID (-1=caller thread)
\r
43 DWORD dwFlags; // reserved for future use, must be zero
\r
46 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
\r
48 THREADNAME_INFO info;
\r
50 info.dwType = 0x1000;
\r
51 info.szName = szThreadName;
\r
52 info.dwThreadID = dwThreadID;
\r
57 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );
\r
59 __except (EXCEPTION_CONTINUE_EXECUTION){}
\r
64 class executor : boost::noncopyable
\r
66 const std::string name_;
\r
67 boost::thread thread_;
\r
68 tbb::atomic<bool> is_running_;
\r
69 tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;
\r
72 explicit executor(const std::wstring& name, bool auto_start = false) : name_(narrow(name))
\r
74 is_running_ = false;
\r
83 if(boost::this_thread::get_id() != thread_.get_id())
\r
87 void set_capacity(size_t capacity)
\r
89 execution_queue_.set_capacity(capacity);
\r
92 void start() // noexcept
\r
94 if(is_running_.fetch_and_store(true))
\r
97 thread_ = boost::thread([this]{run();});
\r
100 void stop() // noexcept
\r
102 is_running_ = false;
\r
103 execution_queue_.try_push([]{});
\r
108 std::function<void()> func;
\r
109 auto size = execution_queue_.size();
\r
110 for(int n = 0; n < size; ++n)
\r
114 if(!execution_queue_.try_pop(func))
\r
117 catch(boost::broken_promise&){}
\r
121 template<typename Func>
\r
122 auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept
\r
124 typedef boost::packaged_task<decltype(func())> task_type;
\r
126 auto task = task_type(std::forward<Func>(func));
\r
127 auto future = task.get_future();
\r
129 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
\r
133 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
136 catch(boost::task_already_started&){}
\r
139 struct task_adaptor_t
\r
141 task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}
\r
142 task_adaptor_t(task_type&& task) : task(std::move(task)){}
\r
143 void operator()() const { task(); }
\r
144 mutable task_type task;
\r
145 } task_adaptor(std::move(task));
\r
147 execution_queue_.push([=]
\r
149 try{task_adaptor();}
\r
150 catch(boost::task_already_started&){}
\r
151 catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}
\r
154 return std::move(future);
\r
157 template<typename Func>
\r
158 auto invoke(Func&& func) -> decltype(func())
\r
160 if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
\r
163 return begin_invoke(std::forward<Func>(func)).get();
\r
166 tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity(); }
\r
167 tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size(); }
\r
168 bool empty() const { return execution_queue_.empty(); }
\r
169 bool is_running() const { return is_running_; }
\r
173 void execute() // noexcept
\r
175 std::function<void()> func;
\r
176 execution_queue_.pop(func);
\r
180 void run() // noexcept
\r
182 win32_exception::install_handler();
\r
183 detail::SetThreadName(GetCurrentThreadId(), name_.c_str());
\r