X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fconcurrency%2Fexecutor.h;h=a1f41ee761f69428a7f3faea08f802eedea92447;hb=9e3d038a1064e8dd4b35b167935430a7ba8a885a;hp=bbd0693729fa284f74214399793a54b59b572374;hpb=845007d687da1ab729657cf62d1ad5e4dd6f3361;p=casparcg diff --git a/common/concurrency/executor.h b/common/concurrency/executor.h index bbd069372..a1f41ee76 100644 --- a/common/concurrency/executor.h +++ b/common/concurrency/executor.h @@ -1,140 +1,248 @@ +/* +* copyright (c) 2010 Sveriges Television AB +* +* This file is part of CasparCG. +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. + +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +*/ #pragma once #include "../exception/win32_exception.h" +#include "../utility/string.h" +#include "../utility/move_on_copy.h" #include "../log/log.h" #include #include #include +#include #include #include -#include namespace caspar { +namespace detail { + +typedef struct tagTHREADNAME_INFO +{ + DWORD dwType; // must be 0x1000 + LPCSTR szName; // pointer to name (in user addr space) + DWORD dwThreadID; // thread ID (-1=caller thread) + DWORD dwFlags; // reserved for future use, must be zero +} THREADNAME_INFO; + +inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) +{ + THREADNAME_INFO info; + { + info.dwType = 0x1000; + info.szName = szThreadName; + info.dwThreadID = dwThreadID; + info.dwFlags = 0; + } + __try + { + RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info ); + } + __except (EXCEPTION_CONTINUE_EXECUTION){} +} + +} + +enum task_priority +{ + high_priority, + normal_priority, + priority_count +}; + +enum thread_priority +{ + high_priority_class, + above_normal_priority_class, + normal_priority_class, + below_normal_priority_class +}; + class executor : boost::noncopyable { + const std::string name_; + boost::thread thread_; + tbb::atomic is_running_; + + typedef tbb::concurrent_bounded_queue> function_queue; + function_queue execution_queue_[priority_count]; + + template + auto create_task(Func&& func) -> boost::packaged_task // noexcept + { + typedef boost::packaged_task task_type; + + auto task = task_type(std::forward(func)); + + task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. + { + try + { + if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. + my_task(); + } + catch(boost::task_already_started&){} + })); + + return std::move(task); + } + public: - explicit executor(const std::function& f = nullptr) + explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept { - is_running_ = false; - f_ = f != nullptr ? f : [this]{run();}; + is_running_ = true; + thread_ = boost::thread([this]{run();}); } - - virtual ~executor() + + virtual ~executor() // noexcept { - wait(); stop(); - if(boost::this_thread::get_id() != thread_.get_id()) - thread_.join(); + join(); } - void set_capacity(size_t capacity) + void set_capacity(size_t capacity) // noexcept { - execution_queue_.set_capacity(capacity); + execution_queue_[normal_priority].set_capacity(capacity); } - void start() // noexcept + void set_priority_class(thread_priority p) { - if(is_running_.fetch_and_store(true)) - return; - thread_ = boost::thread(f_); - try + begin_invoke([=] { - execution_queue_.clear(); - } - catch(boost::broken_promise&){} + if(p == high_priority_class) + SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS); + else if(p == above_normal_priority_class) + SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS); + else if(p == normal_priority_class) + SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS); + else if(p == below_normal_priority_class) + SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS); + }); } - + void stop() // noexcept { is_running_ = false; - execution_queue_.push([]{}); + execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread. } - void wait() + void wait() // noexcept { invoke([]{}); } - void clear() + void join() { - std::function func; - while(execution_queue_.try_pop(func)){} + if(boost::this_thread::get_id() != thread_.get_id()) + thread_.join(); } - + template - auto begin_invoke(Func&& func) -> boost::unique_future // noexcept + auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future // noexcept { - typedef boost::packaged_task task_type; - - auto task = task_type(std::forward(func)); - auto future = task.get_future(); - - task.set_wait_callback(std::function([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. + // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. + auto task_adaptor = make_move_on_copy(create_task(func)); + + auto future = task_adaptor.value.get_future(); + + execution_queue_[priority].push([=] { try { - if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. - my_task(); + task_adaptor.value(); + } + catch(boost::task_already_started&) + { + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); } - catch(boost::task_already_started&){} - })); - - struct task_adaptor_t - { - task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){} - task_adaptor_t(task_type&& task) : task(std::move(task)){} - void operator()() const { task(); } - mutable task_type task; - } task_adaptor(std::move(task)); - - execution_queue_.push([=] - { - try{task_adaptor();} - catch(boost::task_already_started&){} - catch(...){CASPAR_LOG_CURRENT_EXCEPTION();} }); + if(priority != normal_priority) + execution_queue_[normal_priority].push(nullptr); + return std::move(future); } template - auto invoke(Func&& func) -> decltype(func()) + auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept { if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. return func(); - return begin_invoke(std::forward(func)).get(); + return begin_invoke(std::forward(func), prioriy).get(); } + + void yield() // noexcept + { + if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread. + return; - tbb::concurrent_bounded_queue>::size_type size() const { return execution_queue_.size(); } - bool empty() const { return execution_queue_.empty(); } - bool is_running() const { return is_running_; } + std::function func; + while(execution_queue_[high_priority].try_pop(func)) + { + if(func) + func(); + } + } + + function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); } + function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); } + bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); } + bool is_running() const /*noexcept*/ { return is_running_; } private: void execute() // noexcept { std::function func; - execution_queue_.pop(func); - func(); + execution_queue_[normal_priority].pop(func); + + yield(); + + if(func) + func(); } void run() // noexcept { - win32_exception::install_handler(); + win32_exception::install_handler(); + detail::SetThreadName(GetCurrentThreadId(), name_.c_str()); while(is_running_) - execute(); - } - - std::function f_; - boost::thread thread_; - tbb::atomic is_running_; - tbb::concurrent_bounded_queue> execution_queue_; + { + try + { + execute(); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } + } }; } \ No newline at end of file