From: hellgore Date: Mon, 10 Sep 2012 15:20:41 +0000 (+0000) Subject: Refactored executor to be more like in 2.0 and broke out the priority queue-related... X-Git-Tag: 2.1.0_Beta1~499 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=fc2015fa0d71b5301b14af66410644deef26a671;p=casparcg Refactored executor to be more like in 2.0 and broke out the priority queue-related functionality to a separate class. git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3279 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- diff --git a/common/blocking_priority_queue.h b/common/blocking_priority_queue.h new file mode 100644 index 000000000..32518d789 --- /dev/null +++ b/common/blocking_priority_queue.h @@ -0,0 +1,265 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ +#pragma once + +#include +#include +#include + +#include + +#include +#include + +#include "semaphore.h" + +namespace caspar { + +/** + * Blocking concurrent priority queue supporting a predefined set of + * priorities. FIFO ordering is guaranteed within a priority. + * + * Prio must have the < and > operators defined where a larger instance is of a + * higher priority. + */ +template +class blocking_priority_queue +{ +public: + typedef unsigned int size_type; +private: + std::map, std::greater> queues_by_priority_; + semaphore space_available_; + semaphore elements_available_; + mutable boost::mutex capacity_mutex_; + size_type capacity_; +public: + /** + * Constructor. + * + * @param capacity The initial capacity of the queue. + * @param priorities A forward iterable container with the priorities to + * support. + */ + template + blocking_priority_queue(size_type capacity, const PrioList& priorities) + : space_available_(capacity) + , elements_available_(0u) + , capacity_(capacity) + { + BOOST_FOREACH(Prio priority, priorities) + { + queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue())); + } + + // The std::map is read-only from now on, so there *should* (it is + // unlikely but possible for a std::map implementor to choose to + // rebalance the tree during read operations) be no race conditions + // regarding the map. + // + // This may be true for vc10 as well: + // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx + } + + /** + * Push an element with a given priority to the queue. Blocks until room + * is available. + * + * @param priority The priority of the element. + * @param element The element. + */ + void push(Prio priority, const T& element) + { + acquire_transaction transaction(space_available_); + + push_acquired(priority, element, transaction); + } + + /** + * Attempt to push an element with a given priority to the queue. Will + * immediately return even if there is no room in the queue. + * + * @param priority The priority of the element. + * @param element The element. + * + * @return true if the element was pushed. false if there was no room. + */ + bool try_push(Prio priority, const T& element) + { + if (!space_available_.try_acquire()) + return false; + + acquire_transaction transaction(space_available_, true); + + push_acquired(priority, element, transaction); + + return true; + } + + /** + * Pop the element with the highest priority (fifo for elements with the + * same priority). Blocks until an element is available. + * + * @param element The element to store the result in. + */ + void pop(T& element) + { + acquire_transaction transaction(elements_available_); + + + pop_acquired_any_priority(element, transaction); + } + + /** + * Attempt to pop the element with the highest priority (fifo for elements + * with the same priority) if available. Does not wait until an element is + * available. + * + * @param element The element to store the result in. + * + * @return true if an element was available. + */ + bool try_pop(T& element) + { + if (!elements_available_.try_acquire()) + return false; + + acquire_transaction transaction(elements_available_, true); + + pop_acquired_any_priority(element, transaction); + + return true; + } + + /** + * Attempt to pop the element with the highest priority (fifo for elements + * with the same priority) if available *and* has a minimum priority. Does + * not wait until an element satisfying the priority criteria is available. + * + * @param element The element to store the result in. + * @param minimum_priority The minimum priority to accept. + * + * @return true if an element was available with the minimum priority. + */ + bool try_pop(T& element, Prio minimum_priority) + { + if (!elements_available_.try_acquire()) + return false; + + acquire_transaction transaction(elements_available_, true); + + BOOST_FOREACH(auto& queue, queues_by_priority_) + { + if (queue.first < minimum_priority) + { + // Will be true for all queues from this point so we break. + break; + } + + if (queue.second.try_pop(element)) + { + transaction.commit(); + space_available_.release(); + + return true; + } + } + + return false; + } + + /** + * Modify the capacity of the queue. May block if reducing the capacity. + * + * @param capacity The new capacity. + */ + void set_capacity(size_type capacity) + { + boost::mutex::scoped_lock lock (capacity_mutex_); + + if (capacity_ < capacity) + { + auto to_grow_with = capacity - capacity_; + space_available_.release(to_grow_with); + } + else if (capacity_ > capacity) + { + auto to_shrink_with = capacity_ - capacity; + // Will block until the desired capacity has been reached. + space_available_.acquire(to_shrink_with); + } + + capacity_ = capacity; + } + + /** + * @return the current capacity of the queue. + */ + size_type capacity() const + { + boost::mutex::scoped_lock lock (capacity_mutex_); + + return capacity_; + } + + /** + * @return the current size of the queue (may have changed at the time of + * returning). + */ + size_type size() const + { + return elements_available_.permits(); + } +private: + void push_acquired(Prio priority, const T& element, acquire_transaction& transaction) + { + try + { + queues_by_priority_.at(priority).push(element); + } + catch (std::out_of_range&) + { + throw std::runtime_error("Priority not supported by queue"); + } + + transaction.commit(); + elements_available_.release(); + } + + void pop_acquired_any_priority(T& element, acquire_transaction& transaction) + { + BOOST_FOREACH(auto& queue, queues_by_priority_) + { + if (queue.second.try_pop(element)) + { + transaction.commit(); + space_available_.release(); + + return; + } + } + + throw std::logic_error( + "blocking_priority_queue should have contained at least one element but didn't"); + } +}; + +} diff --git a/common/common.vcxproj b/common/common.vcxproj index 09e29e183..46b96cc09 100644 --- a/common/common.vcxproj +++ b/common/common.vcxproj @@ -152,6 +152,7 @@ + diff --git a/common/common.vcxproj.filters b/common/common.vcxproj.filters index 9681cdf11..e76acc960 100644 --- a/common/common.vcxproj.filters +++ b/common/common.vcxproj.filters @@ -142,5 +142,8 @@ source + + source + \ No newline at end of file diff --git a/common/executor.h b/common/executor.h index 75642ffe2..0e4c021e7 100644 --- a/common/executor.h +++ b/common/executor.h @@ -25,12 +25,15 @@ #include "enum_class.h" #include "log.h" #include "blocking_bounded_queue_adapter.h" +#include "blocking_priority_queue.h" + #include #include #include #include +#include #include @@ -52,37 +55,10 @@ typedef enum_class task_priority; class executor sealed { - struct priority_function - { - int priority; - std::function func; - - priority_function() - { - } - - template - priority_function(int priority, F&& func) - : priority(priority) - , func(std::forward(func)) - { - } - - void operator()() - { - func(); - } - - bool operator<(const priority_function& other) const - { - return priority < other.priority; - } - }; - executor(const executor&); executor& operator=(const executor&); - typedef blocking_bounded_queue_adapter> function_queue_t; + typedef blocking_priority_queue, task_priority> function_queue_t; const std::wstring name_; tbb::atomic is_running_; @@ -92,7 +68,13 @@ class executor sealed public: executor(const std::wstring& name) : name_(name) - , execution_queue_(512) + , execution_queue_(512, boost::assign::list_of + (task_priority::lowest_priority) + (task_priority::lower_priority) + (task_priority::low_priority) + (task_priority::normal_priority) + (task_priority::high_priority) + (task_priority::higher_priority)) { is_running_ = true; thread_ = boost::thread([this]{run();}); @@ -105,7 +87,7 @@ public: internal_begin_invoke([=] { is_running_ = false; - }, false).wait(); + }).wait(); } catch(...) { @@ -114,16 +96,6 @@ public: thread_.join(); } - - template - auto try_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future - { - if(!is_running_) - BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.")); - - // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized). - return internal_begin_invoke(std::forward(func), true, priority); - } template auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept @@ -131,7 +103,7 @@ public: if(!is_running_) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_)); - return internal_begin_invoke(std::forward(func), false, priority); + return internal_begin_invoke(std::forward(func), priority); } template @@ -143,13 +115,14 @@ public: return begin_invoke(std::forward(func), prioriy).get(); } - void yield() + void yield(task_priority minimum_priority) { if(!is_current()) CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_)); - priority_function func; - if(execution_queue_.try_pop(func)) + std::function func; + + while (execution_queue_.try_pop(func, minimum_priority)) func(); } @@ -165,7 +138,7 @@ public: void clear() { - priority_function func; + std::function func; while(execution_queue_.try_pop(func)); } @@ -207,7 +180,6 @@ private: template auto internal_begin_invoke( Func&& func, - bool try_begin, task_priority priority = task_priority::normal_priority) -> boost::unique_future // noexcept { typedef typename std::remove_reference::type function_type; @@ -246,7 +218,7 @@ private: auto future = task->get_future(); auto raw_task = task.release(); - priority_function prio_func(priority.value(), [raw_task] + auto function = [raw_task] { std::unique_ptr task(raw_task); try @@ -254,21 +226,12 @@ private: (*task)(); } catch(boost::task_already_started&){} - }); + }; - if (!execution_queue_.try_push(prio_func)) + if (!execution_queue_.try_push(priority, function)) { - if (try_begin) - { - delete raw_task; - - return boost::unique_future(); - } - else - { - CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; - execution_queue_.push(prio_func); - } + CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller."; + execution_queue_.push(priority, function); } return std::move(future); @@ -281,7 +244,7 @@ private: { try { - priority_function func; + std::function func; execution_queue_.pop(func); func(); } diff --git a/common/semaphore.h b/common/semaphore.h index bd3c6d7ec..1a384397e 100644 --- a/common/semaphore.h +++ b/common/semaphore.h @@ -155,4 +155,44 @@ public: } }; +/** + * Enables RAII-style acquire/release on scope exit unless committed. + */ +class acquire_transaction : boost::noncopyable +{ + semaphore& semaphore_; + bool committed_; +public: + /** + * Constructor. + * + * @param semaphore The semaphore to acquire one permit from. + * @param already_acquired Whether a permit has already been acquired or not. + */ + acquire_transaction(semaphore& semaphore, bool already_acquired = false) + : semaphore_(semaphore) + , committed_(false) + { + if (!already_acquired) + semaphore_.acquire(); + } + + /** + * Destructor that will release one permit if commit() has not been called. + */ + ~acquire_transaction() + { + if (!committed_) + semaphore_.release(); + } + + /** + * Ensure that the acquired permit is kept on destruction. + */ + void commit() + { + committed_ = true; + } +}; + }