X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fconcurrency%2Fexecutor.h;h=ad86d4a379358f11083f56df523ba66b7f703e45;hb=4ba6006396aff3ce007b15c6ebcc43b194cd8a8d;hp=3415ba003f2967e6d2d5b68c5c13e1c4fd617e48;hpb=0d2a803ebd6889429f1a13982bc3222a4250e225;p=casparcg diff --git a/common/concurrency/executor.h b/common/concurrency/executor.h index 3415ba003..ad86d4a37 100644 --- a/common/concurrency/executor.h +++ b/common/concurrency/executor.h @@ -1,136 +1,269 @@ +/* +* copyright (c) 2010 Sveriges Television AB +* +* This file is part of CasparCG. +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. + +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +*/ #pragma once -#include "../exception/exceptions.h" #include "../exception/win32_exception.h" - -#include +#include "../utility/string.h" +#include "../utility/move_on_copy.h" +#include "../log/log.h" #include #include +#include +#include + #include namespace caspar { -class executor +namespace detail { + +typedef struct tagTHREADNAME_INFO { -public: + DWORD dwType; // must be 0x1000 + LPCSTR szName; // pointer to name (in user addr space) + DWORD dwThreadID; // thread ID (-1=caller thread) + DWORD dwFlags; // reserved for future use, must be zero +} THREADNAME_INFO; - enum priority +inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) +{ + THREADNAME_INFO info; { - low_priority = 0, - normal_priority, - high_priority - }; - - explicit executor(const std::function& f = nullptr) + info.dwType = 0x1000; + info.szName = szThreadName; + info.dwThreadID = dwThreadID; + info.dwFlags = 0; + } + __try { - size_ = 0; - is_running_ = false; - f_ = f != nullptr ? f : [this]{run();}; + RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info ); } + __except (EXCEPTION_CONTINUE_EXECUTION){} +} + +} + +enum task_priority +{ + high_priority, + normal_priority, + priority_count +}; - virtual ~executor() +enum thread_priority +{ + high_priority_class, + above_normal_priority_class, + normal_priority_class, + below_normal_priority_class +}; + +class executor : boost::noncopyable +{ + const std::string name_; + boost::thread thread_; + tbb::atomic is_running_; + + typedef tbb::concurrent_bounded_queue> function_queue; + function_queue execution_queue_[priority_count]; + + template + auto create_task(Func&& func) -> boost::packaged_task // noexcept + { + typedef boost::packaged_task task_type; + + auto task = task_type(std::forward(func)); + + task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. + { + try + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + my_task(); + } + catch(boost::task_already_started&){} + })); + + return std::move(task); + } + +public: + + explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept + { + thread_ = boost::thread([this]{run();}); + is_running_ = true; + } + + virtual ~executor() // noexcept { stop(); + join(); } - void start() // noexcept + void set_capacity(size_t capacity) // noexcept { - if(is_running_.fetch_and_store(true)) - return; - thread_ = boost::thread(f_); + execution_queue_[normal_priority].set_capacity(capacity); } - bool is_running() const // noexcept + void set_priority_class(thread_priority p) { - return is_running_; + begin_invoke([=] + { + if(p == high_priority_class) + SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS); + if(p == above_normal_priority_class) + SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS); + else if(p == normal_priority_class) + SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS); + else if(p == below_normal_priority_class) + SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS); + }); } - + void stop() // noexcept { is_running_ = false; - begin_invoke([]{}); // wake if sleeping - assert(boost::this_thread::get_id() != thread_.get_id()); - thread_.join(); + execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. } - void execute() // noexcept + void wait() // noexcept { - boost::unique_lock lock(mut_); - while(size_ < 1) - cond_.wait(lock); - - try_execute(); + invoke([]{}); } - bool try_execute() // noexcept + void join() { - std::function func; - if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func)) - { - func(); - --size_; - } - - return func != nullptr; + if(boost::this_thread::get_id() != thread_.get_id()) + thread_.join(); } - + template - auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future // noexcept + auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept { - typedef decltype(func()) result_type; - - auto task = std::make_shared>(std::forward(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr. - auto future = task->get_future(); - - task->set_wait_callback(std::function([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class. - { - try - { - if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. - task(); - } - catch(boost::task_already_started&){} - })); - execution_queue_[p].push([=] + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); + + execution_queue_[priority].push([=] { - try{(*task)();} + try{task_adaptor.value();} catch(boost::task_already_started&){} catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} }); - ++size_; - cond_.notify_one(); + if(priority != normal_priority) + execution_queue_[normal_priority].push(nullptr); + return std::move(future); } - + template - auto invoke(Func&& func, priority p = normal_priority) -> decltype(func()) + auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept + { + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); + + if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr)) + { + execution_queue_[priority].try_push([=] + { + try{task_adaptor.value();} + catch(boost::task_already_started&){} + catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} + }); + } + + return std::move(future); + } + + template + auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. return func(); - return begin_invoke(std::forward(func), p).get(); + return begin_invoke(std::forward(func), prioriy).get(); } + + template + auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + return func(); -private: + return try_begin_invoke(std::forward(func), prioriy).get(); + } - virtual void run() // noexcept + void yield() // noexcept { - win32_exception::install_handler(); - while(is_running_) - execute(); + if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread. + return; + + std::function func; + while(execution_queue_[high_priority].try_pop(func)) + { + if(func) + func(); + } } + + function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); } + function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); } + bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); } + bool is_running() const /*noexcept*/ { return is_running_; } + +private: + + void execute() // noexcept + { + std::function func; + execution_queue_[normal_priority].pop(func); - tbb::atomic size_; - boost::condition_variable cond_; - boost::mutex mut_; + yield(); - std::function f_; - boost::thread thread_; - tbb::atomic is_running_; - std::array>, 3> execution_queue_; + if(func) + func(); + } + + void run() // noexcept + { + win32_exception::install_handler(); + detail::SetThreadName(GetCurrentThreadId(), name_.c_str()); + while(is_running_) + { + try + { + execute(); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } + } }; } \ No newline at end of file