X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fconcurrency%2Fexecutor.h;h=0687889a179a951b0d2e14d82a9162941a4d03dd;hb=a486c25d5e6ce0ebe08e9a2d793a447ff3cb797a;hp=1369dd6004d0630f82fb3402cc62f95f04a79a33;hpb=834f6b53f14b8770f830aefc272bb76006fa609d;p=casparcg diff --git a/common/concurrency/executor.h b/common/concurrency/executor.h index 1369dd600..0687889a1 100644 --- a/common/concurrency/executor.h +++ b/common/concurrency/executor.h @@ -1,6 +1,27 @@ +/* +* copyright (c) 2010 Sveriges Television AB +* +* This file is part of CasparCG. +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. + +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +*/ #pragma once #include "../exception/win32_exception.h" +#include "../utility/string.h" +#include "../utility/move_on_copy.h" #include "../log/log.h" #include @@ -10,59 +31,67 @@ #include #include -#include namespace caspar { -class executor : boost::noncopyable +namespace detail { + +typedef struct tagTHREADNAME_INFO { -public: + DWORD dwType; // must be 0x1000 + LPCSTR szName; // pointer to name (in user addr space) + DWORD dwThreadID; // thread ID (-1=caller thread) + DWORD dwFlags; // reserved for future use, must be zero +} THREADNAME_INFO; - enum stop_policy - { - wait, - no_wait - }; - - explicit executor(const std::function& f = nullptr) +inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) +{ + THREADNAME_INFO info; { - is_running_ = false; - f_ = f != nullptr ? f : [this]{run();}; + info.dwType = 0x1000; + info.szName = szThreadName; + info.dwThreadID = dwThreadID; + info.dwFlags = 0; } - - ~executor() + __try { - stop(); + RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info ); } + __except (EXCEPTION_CONTINUE_EXECUTION){} +} - void start() // noexcept - { - if(is_running_.fetch_and_store(true)) - return; - thread_ = boost::thread(f_); - } - - void stop(stop_policy policy = wait) // noexcept - { - is_running_ = false; - execution_queue_.push([]{}); - if(policy == wait && boost::this_thread::get_id() != thread_.get_id()) - thread_.join(); - } +} - void clear() - { - std::function func; - while(execution_queue_.try_pop(func)){} - } - +enum task_priority +{ + high_priority, + normal_priority, + priority_count +}; + +enum thread_priority +{ + high_priority_class, + above_normal_priority_class, + normal_priority_class, + below_normal_priority_class +}; + +class executor : boost::noncopyable +{ + const std::string name_; + boost::thread thread_; + tbb::atomic is_running_; + + typedef tbb::concurrent_bounded_queue> function_queue; + function_queue execution_queue_[priority_count]; + template - auto begin_invoke(Func&& func) -> boost::unique_future // noexcept + auto create_task(Func&& func) -> boost::packaged_task // noexcept { typedef boost::packaged_task task_type; auto task = task_type(std::forward(func)); - auto future = task.get_future(); task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. { @@ -74,57 +103,167 @@ public: catch(boost::task_already_started&){} })); - struct task_adaptor_t + return std::move(task); + } + +public: + + explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept + { + thread_ = boost::thread([this]{run();}); + is_running_ = true; + } + + virtual ~executor() // noexcept + { + stop(); + execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. + join(); + } + + void set_capacity(size_t capacity) // noexcept + { + execution_queue_[normal_priority].set_capacity(capacity); + } + + void set_priority_class(thread_priority p) + { + begin_invoke([=] { - task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){} - task_adaptor_t(task_type&& task) : task(std::move(task)){} - void operator()() const { task(); } - mutable task_type task; - } task_adaptor(std::move(task)); + if(p == high_priority_class) + SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS); + if(p == above_normal_priority_class) + SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS); + else if(p == normal_priority_class) + SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS); + else if(p == below_normal_priority_class) + SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS); + }); + } + + void stop() // noexcept + { + is_running_ = false; + } + + void wait() // noexcept + { + invoke([]{}); + } + + void join() + { + if(boost::this_thread::get_id() != thread_.get_id()) + thread_.join(); + } + + template + auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept + { + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); - execution_queue_.try_push([=] + execution_queue_[priority].push([=] { - try{task_adaptor();} + try{task_adaptor.value();} catch(boost::task_already_started&){} catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} }); + if(priority != normal_priority) + execution_queue_[normal_priority].push(nullptr); + return std::move(future); } - + + template + auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept + { + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); + + if(priority == normal_priority || execution_queue_[normal_priority].try_push(nullptr)) + { + execution_queue_[priority].try_push([=] + { + try{task_adaptor.value();} + catch(boost::task_already_started&){} + catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} + }); + } + + return std::move(future); + } + template - auto invoke(Func&& func) -> decltype(func()) + auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. return func(); - return begin_invoke(std::forward(func)).get(); + return begin_invoke(std::forward(func), prioriy).get(); } - tbb::concurrent_bounded_queue>::size_type size() const { return execution_queue_.size(); } - bool empty() const { return execution_queue_.empty(); } - bool is_running() const { return is_running_; } + template + auto try_invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + return func(); + + return try_begin_invoke(std::forward(func), prioriy).get(); + } + + void yield() // noexcept + { + if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread. + return; + + std::function func; + while(execution_queue_[high_priority].try_pop(func)) + { + if(func) + func(); + } + } + + function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); } + function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); } + bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); } + bool is_running() const /*noexcept*/ { return is_running_; } private: void execute() // noexcept { std::function func; - execution_queue_.pop(func); - func(); + execution_queue_[normal_priority].pop(func); + + yield(); + + if(func) + func(); } void run() // noexcept { - win32_exception::install_handler(); + win32_exception::install_handler(); + detail::SetThreadName(GetCurrentThreadId(), name_.c_str()); while(is_running_) - execute(); - } - - std::function f_; - boost::thread thread_; - tbb::atomic is_running_; - tbb::concurrent_bounded_queue> execution_queue_; + { + try + { + execute(); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } + } }; } \ No newline at end of file