]> git.sesse.net Git - casparcg/blobdiff - common/executor.h
Refactored executor to support try_begin_invoke which does not block until room is...
[casparcg] / common / executor.h
index b58339e1f29b704a28c3568eb9b76b10db514a75..5c296dec0dac97a12b94f4d4036c1b8ad968722c 100644 (file)
 #include "except.h"
 #include "enum_class.h"
 #include "log.h"
+#include "blocking_bounded_queue_adapter.h"
 
 #include <tbb/atomic.h>
 #include <tbb/concurrent_priority_queue.h>
-#include <tbb/concurrent_queue.h>
 
 #include <boost/thread.hpp>
+#include <boost/optional.hpp>
 
 #include <functional>
 
@@ -81,20 +82,19 @@ class executor sealed
        executor(const executor&);
        executor& operator=(const executor&);
        
-       typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;
+       typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>>       function_queue_t;
        
        const std::wstring                                                                                      name_;
        tbb::atomic<bool>                                                                                       is_running_;
        boost::thread                                                                                           thread_;        
        function_queue_t                                                                                        execution_queue_;
-       tbb::concurrent_bounded_queue<int>                                                      semaphore_;
                
 public:                
        executor(const std::wstring& name)
                : name_(name)
+               , execution_queue_(512)
        {
                is_running_ = true;
-               set_capacity(512);
                thread_ = boost::thread([this]{run();});
        }
        
@@ -105,7 +105,7 @@ public:
                        internal_begin_invoke([=]
                        {
                                is_running_ = false;
-                       }).wait();
+                       }, false).wait();
                }
                catch(...)
                {
@@ -114,14 +114,24 @@ public:
                
                thread_.join();
        }
-                                               
+       
+       template<typename Func>
+       auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())>
+       {       
+               if(!is_running_)
+                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+               // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
+               return internal_begin_invoke(std::forward<Func>(func), true, priority);
+       }
+
        template<typename Func>
        auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
        {       
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
                                
-               return internal_begin_invoke(std::forward<Func>(func), priority);       
+               return internal_begin_invoke(std::forward<Func>(func), false, priority);        
        }
        
        template<typename Func>
@@ -138,23 +148,19 @@ public:
                if(!is_current())
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
 
-               int dummy;
-               if(!semaphore_.try_pop(dummy))
-                       return;
-
                priority_function func;
                if(execution_queue_.try_pop(func))
                        func();
        }
 
-       void set_capacity(std::size_t capacity)
+       void set_capacity(function_queue_t::size_type capacity)
        {
-               semaphore_.set_capacity(capacity);
+               execution_queue_.set_capacity(capacity);
        }
 
-       std::size_t capacity() const
+       function_queue_t::size_type capacity() const
        {
-               return semaphore_.capacity();
+               return execution_queue_.capacity();
        }
        
        void clear()
@@ -199,7 +205,10 @@ private:
        }
        
        template<typename Func>
-       auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+       auto internal_begin_invoke(
+               Func&& func,
+               bool try_begin,
+               task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
        {                                       
                typedef typename std::remove_reference<Func>::type      function_type;
                typedef decltype(func())                                                        result_type;
@@ -247,13 +256,21 @@ private:
                        catch(boost::task_already_started&){}
                });
 
-               execution_queue_.push(prio_func);
-
-               if(!semaphore_.try_push(0))
+               if (!execution_queue_.try_push(prio_func))
                {
-                       CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
-                       semaphore_.push(0);
-               }                                       
+                       if (try_begin)
+                       {
+                               delete raw_task;
+
+                               return boost::unique_future<decltype(func())>();
+                       }
+                       else
+                       {
+                               CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+                               execution_queue_.push(prio_func);
+                       }
+               }
+
                return std::move(future);               
        }
 
@@ -264,12 +281,9 @@ private:
                {
                        try
                        {
-                               int dummy;
-                               semaphore_.pop(dummy);
-
                                priority_function func;
-                               if(execution_queue_.try_pop(func))
-                                       func();
+                               execution_queue_.pop(func);
+                               func();
                        }
                        catch(...)
                        {