]> git.sesse.net Git - casparcg/commitdiff
Refactored executor to be more like in 2.0 and broke out the priority queue-related...
authorhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 10 Sep 2012 15:20:41 +0000 (15:20 +0000)
committerhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 10 Sep 2012 15:20:41 +0000 (15:20 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3279 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

common/blocking_priority_queue.h [new file with mode: 0644]
common/common.vcxproj
common/common.vcxproj.filters
common/executor.h
common/semaphore.h

diff --git a/common/blocking_priority_queue.h b/common/blocking_priority_queue.h
new file mode 100644 (file)
index 0000000..32518d7
--- /dev/null
@@ -0,0 +1,265 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+#pragma once
+
+#include <stdexcept>
+#include <map>
+#include <initializer_list>
+
+#include <tbb/concurrent_queue.h>
+
+#include <boost/foreach.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "semaphore.h"
+
+namespace caspar {
+
+/**
+ * Blocking concurrent priority queue supporting a predefined set of
+ * priorities. FIFO ordering is guaranteed within a priority.
+ *
+ * Prio must have the < and > operators defined where a larger instance is of a
+ * higher priority.
+ */
+template <class T, class Prio>
+class blocking_priority_queue
+{
+public:
+       typedef unsigned int size_type;
+private:       
+       std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
+       semaphore space_available_;
+       semaphore elements_available_;
+       mutable boost::mutex capacity_mutex_;
+       size_type capacity_;
+public:
+       /**
+        * Constructor.
+        *
+        * @param capacity   The initial capacity of the queue.
+        * @param priorities A forward iterable container with the priorities to 
+        *                   support.
+        */
+       template<class PrioList>
+       blocking_priority_queue(size_type capacity, const PrioList& priorities)
+               : space_available_(capacity)
+               , elements_available_(0u)
+               , capacity_(capacity)
+       {
+               BOOST_FOREACH(Prio priority, priorities)
+               {
+                       queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
+               }
+
+               // The std::map is read-only from now on, so there *should* (it is 
+               // unlikely but possible for a std::map implementor to choose to 
+               // rebalance the tree during read operations) be no race  conditions 
+               // regarding the map.
+               //
+               // This may be true for vc10 as well:
+               // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
+       }
+
+       /**
+        * Push an element with a given priority to the queue. Blocks until room
+        * is available.
+        *
+        * @param priority The priority of the element.
+        * @param element  The element.
+        */
+       void push(Prio priority, const T& element)
+       {
+               acquire_transaction transaction(space_available_);
+
+               push_acquired(priority, element, transaction);
+       }
+
+       /**
+        * Attempt to push an element with a given priority to the queue. Will
+        * immediately return even if there is no room in the queue.
+        *
+        * @param priority The priority of the element.
+        * @param element  The element.
+        *
+        * @return true if the element was pushed. false if there was no room.
+        */
+       bool try_push(Prio priority, const T& element)
+       {
+               if (!space_available_.try_acquire())
+                       return false;
+
+               acquire_transaction transaction(space_available_, true);
+
+               push_acquired(priority, element, transaction);
+
+               return true;
+       }
+
+       /**
+        * Pop the element with the highest priority (fifo for elements with the
+        * same priority). Blocks until an element is available.
+        *
+        * @param element The element to store the result in.
+        */
+       void pop(T& element)
+       {
+               acquire_transaction transaction(elements_available_);
+
+
+               pop_acquired_any_priority(element, transaction);
+       }
+
+       /**
+        * Attempt to pop the element with the highest priority (fifo for elements
+        * with the same priority) if available. Does not wait until an element is
+        * available.
+        *
+        * @param element The element to store the result in.
+        *
+        * @return true if an element was available.
+        */
+       bool try_pop(T& element)
+       {
+               if (!elements_available_.try_acquire())
+                       return false;
+
+               acquire_transaction transaction(elements_available_, true);
+
+               pop_acquired_any_priority(element, transaction);
+
+               return true;
+       }
+
+       /**
+        * Attempt to pop the element with the highest priority (fifo for elements
+        * with the same priority) if available *and* has a minimum priority. Does
+        * not wait until an element satisfying the priority criteria is available.
+        *
+        * @param element          The element to store the result in.
+        * @param minimum_priority The minimum priority to accept.
+        *
+        * @return true if an element was available with the minimum priority.
+        */
+       bool try_pop(T& element, Prio minimum_priority)
+       {
+               if (!elements_available_.try_acquire())
+                       return false;
+
+               acquire_transaction transaction(elements_available_, true);
+
+               BOOST_FOREACH(auto& queue, queues_by_priority_)
+               {
+                       if (queue.first < minimum_priority)
+                       {
+                               // Will be true for all queues from this point so we break.
+                               break;
+                       }
+
+                       if (queue.second.try_pop(element))
+                       {
+                               transaction.commit();
+                               space_available_.release();
+
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       /**
+        * Modify the capacity of the queue. May block if reducing the capacity.
+        *
+        * @param capacity The new capacity.
+        */
+       void set_capacity(size_type capacity)
+       {
+               boost::mutex::scoped_lock lock (capacity_mutex_);
+
+               if (capacity_ < capacity)
+               {
+                       auto to_grow_with = capacity - capacity_;
+                       space_available_.release(to_grow_with);
+               }
+               else if (capacity_ > capacity)
+               {
+                       auto to_shrink_with = capacity_ - capacity;
+                       // Will block until the desired capacity has been reached.
+                       space_available_.acquire(to_shrink_with);
+               }
+
+               capacity_ = capacity;
+       }
+
+       /**
+        * @return the current capacity of the queue.
+        */
+       size_type capacity() const
+       {
+               boost::mutex::scoped_lock lock (capacity_mutex_);
+
+               return capacity_;
+       }
+
+       /**
+        * @return the current size of the queue (may have changed at the time of
+        *         returning).
+        */
+       size_type size() const
+       {
+               return elements_available_.permits();
+       }
+private:
+       void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
+       {
+               try
+               {
+                       queues_by_priority_.at(priority).push(element);
+               }
+               catch (std::out_of_range&)
+               {
+                       throw std::runtime_error("Priority not supported by queue");
+               }
+
+               transaction.commit();
+               elements_available_.release();
+       }
+
+       void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
+       {
+               BOOST_FOREACH(auto& queue, queues_by_priority_)
+               {
+                       if (queue.second.try_pop(element))
+                       {
+                               transaction.commit();
+                               space_available_.release();
+
+                               return;
+                       }
+               }
+
+               throw std::logic_error(
+                               "blocking_priority_queue should have contained at least one element but didn't");
+       }
+};
+
+}
index 09e29e183b39e6854d8e861b5af3c40acaf5f15a..46b96cc096e27c58a4c895bf02926ac443e8d118 100644 (file)
     <ClInclude Include="array.h" />\r
     <ClInclude Include="assert.h" />\r
     <ClInclude Include="blocking_bounded_queue_adapter.h" />\r
+    <ClInclude Include="blocking_priority_queue.h" />\r
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
     <ClInclude Include="compiler\vs\stack_walker.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
index 9681cdf11f539af1f76259fa6e808518096e1874..e76acc960e5e9042227b3be5ec335cbba73a97b9 100644 (file)
     <ClInclude Include="blocking_bounded_queue_adapter.h">\r
       <Filter>source</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="blocking_priority_queue.h">\r
+      <Filter>source</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
index 75642ffe291ded2af0d98610fd13d84641822430..0e4c021e7c87b89f38a9e55518a2c2d78db2bbfb 100644 (file)
 #include "enum_class.h"
 #include "log.h"
 #include "blocking_bounded_queue_adapter.h"
+#include "blocking_priority_queue.h"
+
 
 #include <tbb/atomic.h>
 #include <tbb/concurrent_priority_queue.h>
 
 #include <boost/thread.hpp>
 #include <boost/optional.hpp>
+#include <boost/assign/list_of.hpp>
 
 #include <functional>
 
@@ -52,37 +55,10 @@ typedef enum_class<task_priority_def> task_priority;
 
 class executor sealed
 {      
-       struct priority_function
-       {
-               int                                             priority;
-               std::function<void()>   func;
-
-               priority_function()
-               {
-               }
-
-               template<typename F>
-               priority_function(int priority, F&& func)
-                       : priority(priority)
-                       , func(std::forward<F>(func))
-               {
-               }
-
-               void operator()()
-               {
-                       func();
-               }
-
-               bool operator<(const priority_function& other) const
-               {
-                       return priority < other.priority;
-               }
-       };
-
        executor(const executor&);
        executor& operator=(const executor&);
        
-       typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>>       function_queue_t;
+       typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
        
        const std::wstring                                                                                      name_;
        tbb::atomic<bool>                                                                                       is_running_;
@@ -92,7 +68,13 @@ class executor sealed
 public:                
        executor(const std::wstring& name)
                : name_(name)
-               , execution_queue_(512)
+               , execution_queue_(512, boost::assign::list_of
+                               (task_priority::lowest_priority)
+                               (task_priority::lower_priority)
+                               (task_priority::low_priority)
+                               (task_priority::normal_priority)
+                               (task_priority::high_priority)
+                               (task_priority::higher_priority))
        {
                is_running_ = true;
                thread_ = boost::thread([this]{run();});
@@ -105,7 +87,7 @@ public:
                        internal_begin_invoke([=]
                        {
                                is_running_ = false;
-                       }, false).wait();
+                       }).wait();
                }
                catch(...)
                {
@@ -114,16 +96,6 @@ public:
                
                thread_.join();
        }
-       
-       template<typename Func>
-       auto try_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())>
-       {       
-               if(!is_running_)
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
-
-               // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
-               return internal_begin_invoke(std::forward<Func>(func), true, priority);
-       }
 
        template<typename Func>
        auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
@@ -131,7 +103,7 @@ public:
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
                                
-               return internal_begin_invoke(std::forward<Func>(func), false, priority);        
+               return internal_begin_invoke(std::forward<Func>(func), priority);       
        }
        
        template<typename Func>
@@ -143,13 +115,14 @@ public:
                return begin_invoke(std::forward<Func>(func), prioriy).get();
        }
 
-       void yield()
+       void yield(task_priority minimum_priority)
        {
                if(!is_current())
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
 
-               priority_function func;
-               if(execution_queue_.try_pop(func))
+               std::function<void ()> func;
+
+               while (execution_queue_.try_pop(func, minimum_priority))
                        func();
        }
 
@@ -165,7 +138,7 @@ public:
        
        void clear()
        {               
-               priority_function func;
+               std::function<void ()> func;
                while(execution_queue_.try_pop(func));
        }
                                
@@ -207,7 +180,6 @@ private:
        template<typename Func>
        auto internal_begin_invoke(
                Func&& func,
-               bool try_begin,
                task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
        {                                       
                typedef typename std::remove_reference<Func>::type      function_type;
@@ -246,7 +218,7 @@ private:
                auto future = task->get_future();
 
                auto raw_task = task.release();
-               priority_function prio_func(priority.value(), [raw_task]
+               auto function = [raw_task]
                {
                        std::unique_ptr<task_type> task(raw_task);
                        try
@@ -254,21 +226,12 @@ private:
                                (*task)();
                        }
                        catch(boost::task_already_started&){}
-               });
+               };
 
-               if (!execution_queue_.try_push(prio_func))
+               if (!execution_queue_.try_push(priority, function))
                {
-                       if (try_begin)
-                       {
-                               delete raw_task;
-
-                               return boost::unique_future<decltype(func())>();
-                       }
-                       else
-                       {
-                               CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
-                               execution_queue_.push(prio_func);
-                       }
+                       CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+                       execution_queue_.push(priority, function);
                }
 
                return std::move(future);               
@@ -281,7 +244,7 @@ private:
                {
                        try
                        {
-                               priority_function func;
+                               std::function<void ()> func;
                                execution_queue_.pop(func);
                                func();
                        }
index bd3c6d7ecea49b10b4853133945aa18f8a893a83..1a384397e99c936ab849ed1bacac3cff5023c015 100644 (file)
@@ -155,4 +155,44 @@ public:
        }
 };
 
+/**
+ * Enables RAII-style acquire/release on scope exit unless committed.
+ */
+class acquire_transaction : boost::noncopyable
+{
+       semaphore& semaphore_;
+       bool committed_;
+public:
+       /**
+        * Constructor.
+        *
+        * @param semaphore        The semaphore to acquire one permit from.
+        * @param already_acquired Whether a permit has already been acquired or not.
+        */
+       acquire_transaction(semaphore& semaphore, bool already_acquired = false)
+               : semaphore_(semaphore)
+               , committed_(false)
+       {
+               if (!already_acquired)
+                       semaphore_.acquire();
+       }
+
+       /**
+        * Destructor that will release one permit if commit() has not been called.
+        */
+       ~acquire_transaction()
+       {
+               if (!committed_)
+                       semaphore_.release();
+       }
+
+       /**
+        * Ensure that the acquired permit is kept on destruction.
+        */
+       void commit()
+       {
+               committed_ = true;
+       }
+};
+
 }