X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fconcurrency%2Fexecutor.h;h=0687889a179a951b0d2e14d82a9162941a4d03dd;hb=a486c25d5e6ce0ebe08e9a2d793a447ff3cb797a;hp=5fb6ef521d39c2dc4d678351a0b31dbecc7bbe89;hpb=f9d3e8003d1c91b5799e5062b1fdeb6204122807;p=casparcg diff --git a/common/concurrency/executor.h b/common/concurrency/executor.h index 5fb6ef521..0687889a1 100644 --- a/common/concurrency/executor.h +++ b/common/concurrency/executor.h @@ -1,101 +1,269 @@ +/* +* copyright (c) 2010 Sveriges Television AB +* +* This file is part of CasparCG. +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. + +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +*/ #pragma once -#include +#include "../exception/win32_exception.h" +#include "../utility/string.h" +#include "../utility/move_on_copy.h" +#include "../log/log.h" #include #include +#include +#include + #include -namespace caspar { namespace common { +namespace caspar { + +namespace detail { -class executor +typedef struct tagTHREADNAME_INFO { -public: - executor(const std::function& run_func = nullptr) : run_func_(run_func) + DWORD dwType; // must be 0x1000 + LPCSTR szName; // pointer to name (in user addr space) + DWORD dwThreadID; // thread ID (-1=caller thread) + DWORD dwFlags; // reserved for future use, must be zero +} THREADNAME_INFO; + +inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) +{ + THREADNAME_INFO info; { - is_running_ = false; - if(run_func_ == nullptr) - run_func_ = [=]{default_run();}; + info.dwType = 0x1000; + info.szName = szThreadName; + info.dwThreadID = dwThreadID; + info.dwFlags = 0; } - - virtual ~executor() + __try { - stop(); + RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info ); } + __except (EXCEPTION_CONTINUE_EXECUTION){} +} - void start() - { - if(is_running_.fetch_and_store(true)) - return; - thread_ = boost::thread(run_func_); +} + +enum task_priority +{ + high_priority, + normal_priority, + priority_count +}; + +enum thread_priority +{ + high_priority_class, + above_normal_priority_class, + normal_priority_class, + below_normal_priority_class +}; + +class executor : boost::noncopyable +{ + const std::string name_; + boost::thread thread_; + tbb::atomic is_running_; + + typedef tbb::concurrent_bounded_queue> function_queue; + function_queue execution_queue_[priority_count]; + + template + auto create_task(Func&& func) -> boost::packaged_task // noexcept + { + typedef boost::packaged_task task_type; + + auto task = task_type(std::forward(func)); + + task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. + { + try + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + my_task(); + } + catch(boost::task_already_started&){} + })); + + return std::move(task); } - bool is_running() const +public: + + explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept { - return is_running_; + thread_ = boost::thread([this]{run();}); + is_running_ = true; + } + + virtual ~executor() // noexcept + { + stop(); + execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. + join(); } - bool stop(unsigned int timeout = 5000) + void set_capacity(size_t capacity) // noexcept { - if(is_running_.fetch_and_store(false)) - { - execution_queue_.push([](){}); - execute(false); - } - return thread_.timed_join(boost::posix_time::milliseconds(timeout)); + execution_queue_[normal_priority].set_capacity(capacity); } - void execute(bool block = false) + void set_priority_class(thread_priority p) { - std::function func; - if(block) + begin_invoke([=] { - execution_queue_.pop(func); - func(); - } + if(p == high_priority_class) + SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS); + if(p == above_normal_priority_class) + SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS); + else if(p == normal_priority_class) + SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS); + else if(p == below_normal_priority_class) + SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS); + }); + } + + void stop() // noexcept + { + is_running_ = false; + } - while(execution_queue_.try_pop(func)) - func(); + void wait() // noexcept + { + invoke([]{}); } + void join() + { + if(boost::this_thread::get_id() != thread_.get_id()) + thread_.join(); + } + template - auto begin_invoke(Func&& func) -> boost::unique_future + auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept { - typedef decltype(func()) result_type; + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); - if(!is_running_) - return boost::packaged_task([]{ return result_type(); }).get_future(); + auto future = task_adaptor.value.get_future(); - if(boost::this_thread::get_id() == thread_.get_id()) - return boost::packaged_task([=]{ return func(); }).get_future(); + execution_queue_[priority].push([=] + { + try{task_adaptor.value();} + catch(boost::task_already_started&){} + catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} + }); - auto task = std::make_shared>([=]{ return is_running_ ? func() : result_type(); }); - auto future = task->get_future(); + if(priority != normal_priority) + execution_queue_[normal_priority].push(nullptr); + + return std::move(future); + } - execution_queue_.push([=]{(*task)();}); + template + auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept + { + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); - return std::move(future); + if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr)) + { + execution_queue_[priority].try_push([=] + { + try{task_adaptor.value();} + catch(boost::task_already_started&){} + catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} + }); + } + + return std::move(future); } - + template - auto invoke(Func&& func) -> decltype(func()) + auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { - return begin_invoke(std::forward(func)).get(); + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + return func(); + + return begin_invoke(std::forward(func), prioriy).get(); + } + + template + auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + return func(); + + return try_begin_invoke(std::forward(func), prioriy).get(); + } + + void yield() // noexcept + { + if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread. + return; + + std::function func; + while(execution_queue_[high_priority].try_pop(func)) + { + if(func) + func(); + } } + function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); } + function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); } + bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); } + bool is_running() const /*noexcept*/ { return is_running_; } + private: - - void default_run() throw() + + void execute() // noexcept { - while(is_running_) - execute(true); + std::function func; + execution_queue_[normal_priority].pop(func); + + yield(); + + if(func) + func(); } - std::function run_func_; - boost::thread thread_; - tbb::atomic is_running_; - tbb::concurrent_bounded_queue> execution_queue_; + void run() // noexcept + { + win32_exception::install_handler(); + detail::SetThreadName(GetCurrentThreadId(), name_.c_str()); + while(is_running_) + { + try + { + execute(); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } + } }; -}} \ No newline at end of file +} \ No newline at end of file