From: hellgore Date: Thu, 6 Sep 2012 14:43:49 +0000 (+0000) Subject: Refactored executor to support try_begin_invoke which does not block until room is... X-Git-Tag: 2.1.0_Beta1~501 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=f1169352e92ac3537803b172bf59d4d137113cab;p=casparcg Refactored executor to support try_begin_invoke which does not block until room is available in the executor queue, but returns immediately instead. git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3248 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- diff --git a/common/blocking_bounded_queue_adapter.h b/common/blocking_bounded_queue_adapter.h new file mode 100644 index 000000000..a5b59ff67 --- /dev/null +++ b/common/blocking_bounded_queue_adapter.h @@ -0,0 +1,198 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include "semaphore.h" + +#include +#include + +namespace caspar { + +/** + * Adapts an unbounded non-blocking concurrent queue into a blocking bounded + * concurrent queue. + * + * The queue Q to adapt must support the following use cases: + * + * Q q; + * Q::value_type elem; + * q.push(elem); + * + * and: + * + * Q q; + * Q::value_type elem; + * q.try_pop(elem); + * + * It must also guarantee thread safety for those operations. + */ +template +class blocking_bounded_queue_adapter : boost::noncopyable +{ +public: + typedef typename Q::value_type value_type; + typedef unsigned int size_type; +private: + semaphore space_available_; + semaphore elements_available_; + Q queue_; + mutable boost::mutex capacity_mutex_; + size_type capacity_; +public: + /** + * Constructor. + * + * @param capacity The capacity of the queue. + */ + blocking_bounded_queue_adapter(size_type capacity) + : space_available_(capacity) + , elements_available_(0) + , capacity_(capacity) + { + } + + /** + * Push an element to the queue, block until room is available. + * + * @param element The element to push. + */ + void push(const value_type& element) + { + space_available_.acquire(); + push_after_room_reserved(element); + } + + /** + * Try to push an element to the queue, returning immediately if room is not + * available. + * + * @param element The element to push. + * + * @return true if there was room for the element. + */ + bool try_push(const value_type& element) + { + bool room_available = space_available_.try_acquire(); + + if (!room_available) + return false; + + push_after_room_reserved(element); + + return true; + } + + /** + * Pop an element from the queue, will block until an element is available. + * + * @param element The element to store the result in. + */ + void pop(value_type& element) + { + elements_available_.acquire(); + queue_.try_pop(element); + space_available_.release(); + } + + /** + * Try to pop an element from the queue, returning immediately if no + * element is available. + * + * @param element The element to store the result in. + * + * @return true if an element was popped. + */ + bool try_pop(value_type& element) + { + if (!elements_available_.try_acquire()) + return false; + + queue_.try_pop(element); + space_available_.release(); + + return true; + } + + /** + * Modify the capacity of the queue. May block if reducing the capacity. + * + * @param capacity The new capacity. + */ + void set_capacity(size_type capacity) + { + boost::mutex::scoped_lock lock (capacity_mutex_); + + if (capacity_ < capacity) + { + auto to_grow_with = capacity - capacity_; + + space_available_.release(to_grow_with); + } + else if (capacity_ > capacity) + { + auto to_shrink_with = capacity_ - capacity; + + // Will block until the desired capacity has been reached. + space_available_.acquire(to_shrink_with); + } + + capacity_ = capacity; + } + + /** + * @return the current capacity of the queue. + */ + size_type capacity() const + { + boost::mutex::scoped_lock lock (capacity_mutex_); + + return capacity_; + } + + /** + * @return the current size of the queue (may have changed at the time of + * returning). + */ + size_type size() const + { + return elements_available_.permits(); + } +private: + void push_after_room_reserved(const value_type& element) + { + try + { + queue_.push(element); + } + catch (...) + { + space_available_.release(); + + throw; + } + + elements_available_.release(); + } +}; + +} diff --git a/common/common.vcxproj b/common/common.vcxproj index ba0e326d0..09e29e183 100644 --- a/common/common.vcxproj +++ b/common/common.vcxproj @@ -151,6 +151,7 @@ + @@ -173,6 +174,7 @@ + diff --git a/common/common.vcxproj.filters b/common/common.vcxproj.filters index e256c07cd..9681cdf11 100644 --- a/common/common.vcxproj.filters +++ b/common/common.vcxproj.filters @@ -136,5 +136,11 @@ source\compiler\vs + + source + + + source + \ No newline at end of file diff --git a/common/executor.h b/common/executor.h index b58339e1f..5c296dec0 100644 --- a/common/executor.h +++ b/common/executor.h @@ -24,12 +24,13 @@ #include "except.h" #include "enum_class.h" #include "log.h" +#include "blocking_bounded_queue_adapter.h" #include #include -#include #include +#include #include @@ -81,20 +82,19 @@ class executor sealed executor(const executor&); executor& operator=(const executor&); - typedef tbb::concurrent_priority_queue function_queue_t; + typedef blocking_bounded_queue_adapter> function_queue_t; const std::wstring name_; tbb::atomic is_running_; boost::thread thread_; function_queue_t execution_queue_; - tbb::concurrent_bounded_queue semaphore_; public: executor(const std::wstring& name) : name_(name) + , execution_queue_(512) { is_running_ = true; - set_capacity(512); thread_ = boost::thread([this]{run();}); } @@ -105,7 +105,7 @@ public: internal_begin_invoke([=] { is_running_ = false; - }).wait(); + }, false).wait(); } catch(...) { @@ -114,14 +114,24 @@ public: thread_.join(); } - + + template + auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future + { + if(!is_running_) + BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.")); + + // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized). + return internal_begin_invoke(std::forward(func), true, priority); + } + template auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept { if(!is_running_) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_)); - return internal_begin_invoke(std::forward(func), priority); + return internal_begin_invoke(std::forward(func), false, priority); } template @@ -138,23 +148,19 @@ public: if(!is_current()) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_)); - int dummy; - if(!semaphore_.try_pop(dummy)) - return; - priority_function func; if(execution_queue_.try_pop(func)) func(); } - void set_capacity(std::size_t capacity) + void set_capacity(function_queue_t::size_type capacity) { - semaphore_.set_capacity(capacity); + execution_queue_.set_capacity(capacity); } - std::size_t capacity() const + function_queue_t::size_type capacity() const { - return semaphore_.capacity(); + return execution_queue_.capacity(); } void clear() @@ -199,7 +205,10 @@ private: } template - auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept + auto internal_begin_invoke( + Func&& func, + bool try_begin, + task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept { typedef typename std::remove_reference::type function_type; typedef decltype(func()) result_type; @@ -247,13 +256,21 @@ private: catch(boost::task_already_started&){} }); - execution_queue_.push(prio_func); - - if(!semaphore_.try_push(0)) + if (!execution_queue_.try_push(prio_func)) { - CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; - semaphore_.push(0); - } + if (try_begin) + { + delete raw_task; + + return boost::unique_future(); + } + else + { + CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; + execution_queue_.push(prio_func); + } + } + return std::move(future); } @@ -264,12 +281,9 @@ private: { try { - int dummy; - semaphore_.pop(dummy); - priority_function func; - if(execution_queue_.try_pop(func)) - func(); + execution_queue_.pop(func); + func(); } catch(...) { diff --git a/common/semaphore.h b/common/semaphore.h new file mode 100644 index 000000000..bd3c6d7ec --- /dev/null +++ b/common/semaphore.h @@ -0,0 +1,158 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include + +#include +#include +#include + +namespace caspar { + +template +void repeat_n(N times_to_repeat_block, const Func& func) +{ + for (N i = 0; i < times_to_repeat_block; ++i) + { + func(); + } +} + +/** + * Counting semaphore modelled after java.util.concurrent.Semaphore + */ +class semaphore : boost::noncopyable +{ + mutable boost::mutex mutex_; + unsigned int permits_; + boost::condition_variable permits_available_; +public: + /** + * Constructor. + * + * @param permits The initial number of permits. + */ + semaphore(unsigned int permits) + : permits_(permits) + { + } + + /** + * Release a permit. + */ + void release() + { + boost::mutex::scoped_lock lock(mutex_); + + ++permits_; + + permits_available_.notify_one(); + } + + /** + * Release a permit. + * + * @param permits The number of permits to release. + */ + void release(unsigned int permits) + { + boost::mutex::scoped_lock lock(mutex_); + + permits_ += permits; + + repeat_n(permits, [this] { permits_available_.notify_one(); }); + } + + /** + * Acquire a permit. Will block until one becomes available if no permit is + * currently available. + */ + void acquire() + { + boost::mutex::scoped_lock lock(mutex_); + + while (permits_ == 0u) + { + permits_available_.wait(lock); + } + + --permits_; + } + + /** + * Acquire a number of permits. Will block until the given number of + * permits has been acquired if not enough permits are currently available. + * + * @param permits The number of permits to acquire. + */ + void acquire(unsigned int permits) + { + boost::mutex::scoped_lock lock(mutex_); + auto num_acquired = 0u; + + while (permits_ == 0u && num_acquired < permits) + { + permits_available_.wait(lock); + + auto num_wanted = permits - num_acquired; + auto to_drain = std::min(num_wanted, permits_); + + permits_ -= to_drain; + num_acquired += to_drain; + } + } + + /** + * Acquire one permits if permits are currently available. Does not block + * until one is available, but returns immediately if unavailable. + * + * @return true if a permit was acquired or false if no permits where + * currently available. + */ + bool try_acquire() + { + boost::mutex::scoped_lock lock(mutex_); + + if (permits_ == 0u) + return false; + else + { + --permits_; + + return true; + } + } + + /** + * @return the current number of permits (may have changed at the time of + * return). + */ + unsigned int permits() const + { + boost::mutex::scoped_lock lock(mutex_); + + return permits_; + } +}; + +}