X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fconcurrency%2Fexecutor.h;h=0687889a179a951b0d2e14d82a9162941a4d03dd;hb=a486c25d5e6ce0ebe08e9a2d793a447ff3cb797a;hp=da0e1d672f844a1f135e51432e0e1a23b20cfb41;hpb=745f022a44f0d347e56acbbd597ff71e06eab894;p=casparcg diff --git a/common/concurrency/executor.h b/common/concurrency/executor.h index da0e1d672..0687889a1 100644 --- a/common/concurrency/executor.h +++ b/common/concurrency/executor.h @@ -1,6 +1,27 @@ +/* +* copyright (c) 2010 Sveriges Television AB +* +* This file is part of CasparCG. +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. + +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +*/ #pragma once #include "../exception/win32_exception.h" +#include "../utility/string.h" +#include "../utility/move_on_copy.h" #include "../log/log.h" #include @@ -41,118 +62,190 @@ inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) } +enum task_priority +{ + high_priority, + normal_priority, + priority_count +}; + +enum thread_priority +{ + high_priority_class, + above_normal_priority_class, + normal_priority_class, + below_normal_priority_class +}; + class executor : boost::noncopyable { const std::string name_; boost::thread thread_; tbb::atomic is_running_; - tbb::concurrent_bounded_queue> execution_queue_; + + typedef tbb::concurrent_bounded_queue> function_queue; + function_queue execution_queue_[priority_count]; + + template + auto create_task(Func&& func) -> boost::packaged_task // noexcept + { + typedef boost::packaged_task task_type; + + auto task = task_type(std::forward(func)); + + task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. + { + try + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + my_task(); + } + catch(boost::task_already_started&){} + })); + + return std::move(task); + } + public: - explicit executor(const std::wstring& name = L"executor") : name_(narrow(name)) + explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept { - is_running_ = false; + thread_ = boost::thread([this]{run();}); + is_running_ = true; } - virtual ~executor() + virtual ~executor() // noexcept { stop(); - clear(); - if(boost::this_thread::get_id() != thread_.get_id()) - thread_.join(); + execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. + join(); } - void set_capacity(size_t capacity) + void set_capacity(size_t capacity) // noexcept { - execution_queue_.set_capacity(capacity); + execution_queue_[normal_priority].set_capacity(capacity); } - void start() // noexcept + void set_priority_class(thread_priority p) { - if(is_running_.fetch_and_store(true)) - return; - clear(); - thread_ = boost::thread([this]{run();}); + begin_invoke([=] + { + if(p == high_priority_class) + SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS); + if(p == above_normal_priority_class) + SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS); + else if(p == normal_priority_class) + SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS); + else if(p == below_normal_priority_class) + SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS); + }); } - + void stop() // noexcept { is_running_ = false; - execution_queue_.try_push([]{}); } - - void clear() + + void wait() // noexcept { - std::function func; - auto size = execution_queue_.size(); - for(int n = 0; n < size; ++n) - { - try - { - if(!execution_queue_.try_pop(func)) - return; - } - catch(boost::broken_promise&){} - } + invoke([]{}); + } + + void join() + { + if(boost::this_thread::get_id() != thread_.get_id()) + thread_.join(); } - + template - auto begin_invoke(Func&& func) -> boost::unique_future // noexcept + auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept { - typedef boost::packaged_task task_type; - - auto task = task_type(std::forward(func)); - auto future = task.get_future(); - - task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. - { - try - { - if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. - my_task(); - } - catch(boost::task_already_started&){} - })); - - struct task_adaptor_t - { - task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){} - task_adaptor_t(task_type&& task) : task(std::move(task)){} - void operator()() const { task(); } - mutable task_type task; - } task_adaptor(std::move(task)); + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); - execution_queue_.push([=] + execution_queue_[priority].push([=] { - try{task_adaptor();} + try{task_adaptor.value();} catch(boost::task_already_started&){} catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} }); + if(priority != normal_priority) + execution_queue_[normal_priority].push(nullptr); + return std::move(future); } - + template - auto invoke(Func&& func) -> decltype(func()) + auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept + { + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); + + if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr)) + { + execution_queue_[priority].try_push([=] + { + try{task_adaptor.value();} + catch(boost::task_already_started&){} + catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} + }); + } + + return std::move(future); + } + + template + auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + return func(); + + return begin_invoke(std::forward(func), prioriy).get(); + } + + template + auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. return func(); - return begin_invoke(std::forward(func)).get(); + return try_begin_invoke(std::forward(func), prioriy).get(); + } + + void yield() // noexcept + { + if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread. + return; + + std::function func; + while(execution_queue_[high_priority].try_pop(func)) + { + if(func) + func(); + } } - tbb::concurrent_bounded_queue>::size_type capacity() const { return execution_queue_.capacity(); } - tbb::concurrent_bounded_queue>::size_type size() const { return execution_queue_.size(); } - bool empty() const { return execution_queue_.empty(); } - bool is_running() const { return is_running_; } + function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); } + function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); } + bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); } + bool is_running() const /*noexcept*/ { return is_running_; } private: void execute() // noexcept { std::function func; - execution_queue_.pop(func); - func(); + execution_queue_[normal_priority].pop(func); + + yield(); + + if(func) + func(); } void run() // noexcept @@ -160,7 +253,16 @@ private: win32_exception::install_handler(); detail::SetThreadName(GetCurrentThreadId(), name_.c_str()); while(is_running_) - execute(); + { + try + { + execute(); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } } };