]> git.sesse.net Git - casparcg/commitdiff
Refactored executor to support try_begin_invoke which does not block until room is...
authorhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Thu, 6 Sep 2012 14:43:49 +0000 (14:43 +0000)
committerhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Thu, 6 Sep 2012 14:43:49 +0000 (14:43 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3248 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

common/blocking_bounded_queue_adapter.h [new file with mode: 0644]
common/common.vcxproj
common/common.vcxproj.filters
common/executor.h
common/semaphore.h [new file with mode: 0644]

diff --git a/common/blocking_bounded_queue_adapter.h b/common/blocking_bounded_queue_adapter.h
new file mode 100644 (file)
index 0000000..a5b59ff
--- /dev/null
@@ -0,0 +1,198 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include "semaphore.h"
+
+#include <boost/thread/mutex.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace caspar {
+
+/**
+ * Adapts an unbounded non-blocking concurrent queue into a blocking bounded
+ * concurrent queue.
+ *
+ * The queue Q to adapt must support the following use cases:
+ *
+ * Q q;
+ * Q::value_type elem;
+ * q.push(elem);
+ *
+ * and:
+ *
+ * Q q;
+ * Q::value_type elem;
+ * q.try_pop(elem);
+ *
+ * It must also guarantee thread safety for those operations.
+ */
+template<class Q>
+class blocking_bounded_queue_adapter : boost::noncopyable
+{
+public:
+       typedef typename Q::value_type value_type;
+       typedef unsigned int size_type;
+private:
+       semaphore space_available_;
+       semaphore elements_available_;
+       Q queue_;
+       mutable boost::mutex capacity_mutex_;
+       size_type capacity_;
+public:
+       /**
+        * Constructor.
+        *
+        * @param capacity The capacity of the queue.
+        */
+       blocking_bounded_queue_adapter(size_type capacity)
+               : space_available_(capacity)
+               , elements_available_(0)
+               , capacity_(capacity)
+       {
+       }
+
+       /**
+        * Push an element to the queue, block until room is available.
+        *
+        * @param element The element to push.
+        */
+       void push(const value_type& element)
+       {
+               space_available_.acquire();
+               push_after_room_reserved(element);
+       }
+
+       /**
+        * Try to push an element to the queue, returning immediately if room is not
+        * available.
+        *
+        * @param element The element to push.
+        *
+        * @return true if there was room for the element.
+        */
+       bool try_push(const value_type& element)
+       {
+               bool room_available = space_available_.try_acquire();
+
+               if (!room_available)
+                       return false;
+
+               push_after_room_reserved(element);
+
+               return true;
+       }
+
+       /**
+        * Pop an element from the queue, will block until an element is available.
+        *
+        * @param element The element to store the result in.
+        */
+       void pop(value_type& element)
+       {
+               elements_available_.acquire();
+               queue_.try_pop(element);
+               space_available_.release();
+       }
+
+       /**
+        * Try to pop an element from the queue, returning immediately if no
+        * element is available.
+        *
+        * @param element The element to store the result in.
+        *
+        * @return true if an element was popped.
+        */
+       bool try_pop(value_type& element)
+       {
+               if (!elements_available_.try_acquire())
+                       return false;
+
+               queue_.try_pop(element);
+               space_available_.release();
+
+               return true;
+       }
+
+       /**
+        * Modify the capacity of the queue. May block if reducing the capacity.
+        *
+        * @param capacity The new capacity.
+        */
+       void set_capacity(size_type capacity)
+       {
+               boost::mutex::scoped_lock lock (capacity_mutex_);
+
+               if (capacity_ < capacity)
+               {
+                       auto to_grow_with = capacity - capacity_;
+
+                       space_available_.release(to_grow_with);
+               }
+               else if (capacity_ > capacity)
+               {
+                       auto to_shrink_with = capacity_ - capacity;
+
+                       // Will block until the desired capacity has been reached.
+                       space_available_.acquire(to_shrink_with);
+               }
+
+               capacity_ = capacity;
+       }
+
+       /**
+        * @return the current capacity of the queue.
+        */
+       size_type capacity() const
+       {
+               boost::mutex::scoped_lock lock (capacity_mutex_);
+
+               return capacity_;
+       }
+
+       /**
+        * @return the current size of the queue (may have changed at the time of
+        *         returning).
+        */
+       size_type size() const
+       {
+               return elements_available_.permits();
+       }
+private:
+       void push_after_room_reserved(const value_type& element)
+       {
+               try
+               {
+                       queue_.push(element);
+               }
+               catch (...)
+               {
+                       space_available_.release();
+
+                       throw;
+               }
+
+               elements_available_.release();
+       }
+};
+
+}
index ba0e326d0546d4965da0379cf54a1adc03132c49..09e29e183b39e6854d8e861b5af3c40acaf5f15a 100644 (file)
   <ItemGroup>\r
     <ClInclude Include="array.h" />\r
     <ClInclude Include="assert.h" />\r
+    <ClInclude Include="blocking_bounded_queue_adapter.h" />\r
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
     <ClInclude Include="compiler\vs\stack_walker.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
     <ClInclude Include="param.h" />\r
     <ClInclude Include="prec_timer.h" />\r
     <ClInclude Include="reactive.h" />\r
+    <ClInclude Include="semaphore.h" />\r
     <ClInclude Include="stdafx.h" />\r
     <ClInclude Include="tweener.h" />\r
     <ClInclude Include="utf.h" />\r
index e256c07cdd8712cbb201b0ccce87cc57a0253bf9..9681cdf11f539af1f76259fa6e808518096e1874 100644 (file)
     <ClInclude Include="compiler\vs\stack_walker.h">\r
       <Filter>source\compiler\vs</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="semaphore.h">\r
+      <Filter>source</Filter>\r
+    </ClInclude>\r
+    <ClInclude Include="blocking_bounded_queue_adapter.h">\r
+      <Filter>source</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
index b58339e1f29b704a28c3568eb9b76b10db514a75..5c296dec0dac97a12b94f4d4036c1b8ad968722c 100644 (file)
 #include "except.h"
 #include "enum_class.h"
 #include "log.h"
+#include "blocking_bounded_queue_adapter.h"
 
 #include <tbb/atomic.h>
 #include <tbb/concurrent_priority_queue.h>
-#include <tbb/concurrent_queue.h>
 
 #include <boost/thread.hpp>
+#include <boost/optional.hpp>
 
 #include <functional>
 
@@ -81,20 +82,19 @@ class executor sealed
        executor(const executor&);
        executor& operator=(const executor&);
        
-       typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;
+       typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>>       function_queue_t;
        
        const std::wstring                                                                                      name_;
        tbb::atomic<bool>                                                                                       is_running_;
        boost::thread                                                                                           thread_;        
        function_queue_t                                                                                        execution_queue_;
-       tbb::concurrent_bounded_queue<int>                                                      semaphore_;
                
 public:                
        executor(const std::wstring& name)
                : name_(name)
+               , execution_queue_(512)
        {
                is_running_ = true;
-               set_capacity(512);
                thread_ = boost::thread([this]{run();});
        }
        
@@ -105,7 +105,7 @@ public:
                        internal_begin_invoke([=]
                        {
                                is_running_ = false;
-                       }).wait();
+                       }, false).wait();
                }
                catch(...)
                {
@@ -114,14 +114,24 @@ public:
                
                thread_.join();
        }
-                                               
+       
+       template<typename Func>
+       auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())>
+       {       
+               if(!is_running_)
+                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+               // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
+               return internal_begin_invoke(std::forward<Func>(func), true, priority);
+       }
+
        template<typename Func>
        auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
        {       
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
                                
-               return internal_begin_invoke(std::forward<Func>(func), priority);       
+               return internal_begin_invoke(std::forward<Func>(func), false, priority);        
        }
        
        template<typename Func>
@@ -138,23 +148,19 @@ public:
                if(!is_current())
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
 
-               int dummy;
-               if(!semaphore_.try_pop(dummy))
-                       return;
-
                priority_function func;
                if(execution_queue_.try_pop(func))
                        func();
        }
 
-       void set_capacity(std::size_t capacity)
+       void set_capacity(function_queue_t::size_type capacity)
        {
-               semaphore_.set_capacity(capacity);
+               execution_queue_.set_capacity(capacity);
        }
 
-       std::size_t capacity() const
+       function_queue_t::size_type capacity() const
        {
-               return semaphore_.capacity();
+               return execution_queue_.capacity();
        }
        
        void clear()
@@ -199,7 +205,10 @@ private:
        }
        
        template<typename Func>
-       auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+       auto internal_begin_invoke(
+               Func&& func,
+               bool try_begin,
+               task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
        {                                       
                typedef typename std::remove_reference<Func>::type      function_type;
                typedef decltype(func())                                                        result_type;
@@ -247,13 +256,21 @@ private:
                        catch(boost::task_already_started&){}
                });
 
-               execution_queue_.push(prio_func);
-
-               if(!semaphore_.try_push(0))
+               if (!execution_queue_.try_push(prio_func))
                {
-                       CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
-                       semaphore_.push(0);
-               }                                       
+                       if (try_begin)
+                       {
+                               delete raw_task;
+
+                               return boost::unique_future<decltype(func())>();
+                       }
+                       else
+                       {
+                               CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+                               execution_queue_.push(prio_func);
+                       }
+               }
+
                return std::move(future);               
        }
 
@@ -264,12 +281,9 @@ private:
                {
                        try
                        {
-                               int dummy;
-                               semaphore_.pop(dummy);
-
                                priority_function func;
-                               if(execution_queue_.try_pop(func))
-                                       func();
+                               execution_queue_.pop(func);
+                               func();
                        }
                        catch(...)
                        {
diff --git a/common/semaphore.h b/common/semaphore.h
new file mode 100644 (file)
index 0000000..bd3c6d7
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <cmath>
+
+#include <boost/noncopyable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition.hpp>
+
+namespace caspar {
+
+template <class N, class Func>
+void repeat_n(N times_to_repeat_block, const Func& func)
+{
+       for (N i = 0; i < times_to_repeat_block; ++i)
+       {
+               func();
+       }
+}
+
+/**
+ * Counting semaphore modelled after java.util.concurrent.Semaphore
+ */
+class semaphore : boost::noncopyable
+{
+       mutable boost::mutex mutex_;
+       unsigned int permits_;
+       boost::condition_variable permits_available_;
+public:
+       /**
+        * Constructor.
+        *
+        * @param permits The initial number of permits.
+        */
+       semaphore(unsigned int permits)
+               : permits_(permits)
+       {
+       }
+
+       /**
+        * Release a permit.
+        */
+       void release()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               ++permits_;
+
+               permits_available_.notify_one();
+       }
+
+       /**
+        * Release a permit.
+        *
+        * @param permits The number of permits to release.
+        */
+       void release(unsigned int permits)
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               permits_ += permits;
+
+               repeat_n(permits, [this] { permits_available_.notify_one(); });
+       }
+
+       /**
+        * Acquire a permit. Will block until one becomes available if no permit is
+        * currently available.
+        */
+       void acquire()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               while (permits_ == 0u)
+               {
+                       permits_available_.wait(lock);
+               }
+
+               --permits_;
+       }
+
+       /**
+        * Acquire a number of permits. Will block until the given number of
+        * permits has been acquired if not enough permits are currently available.
+        *
+        * @param permits The number of permits to acquire.
+        */
+       void acquire(unsigned int permits)
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+               auto num_acquired = 0u;
+
+               while (permits_ == 0u && num_acquired < permits)
+               {
+                       permits_available_.wait(lock);
+
+                       auto num_wanted = permits - num_acquired;
+                       auto to_drain = std::min(num_wanted, permits_);
+
+                       permits_ -= to_drain;
+                       num_acquired += to_drain;
+               }
+       }
+
+       /**
+        * Acquire one permits if permits are currently available. Does not block
+        * until one is available, but returns immediately if unavailable.
+        *
+        * @return true if a permit was acquired or false if no permits where
+        *         currently available.
+        */
+       bool try_acquire()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               if (permits_ == 0u)
+                       return false;
+               else
+               {
+                       --permits_;
+
+                       return true;
+               }
+       }
+
+       /**
+        * @return the current number of permits (may have changed at the time of
+        *         return).
+        */
+       unsigned int permits() const
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               return permits_;
+       }
+};
+
+}