]> git.sesse.net Git - casparcg/blobdiff - common/executor.h
[executor] changed default to unbounded like in 2.0.7 and fixed a deadlock when capac...
[casparcg] / common / executor.h
index 75642ffe291ded2af0d98610fd13d84641822430..d8a34d1373880164c69bb0dd9a94e3eb40481588 100644 (file)
 
 #pragma once
 
+#include "os/general_protection_fault.h"
 #include "except.h"
-#include "enum_class.h"
 #include "log.h"
 #include "blocking_bounded_queue_adapter.h"
+#include "blocking_priority_queue.h"
+#include "future.h"
 
 #include <tbb/atomic.h>
 #include <tbb/concurrent_priority_queue.h>
 #include <boost/optional.hpp>
 
 #include <functional>
+#include <future>
 
 namespace caspar {
-               
-struct task_priority_def
+enum class task_priority
 {
-       enum type
-       {
-               lowest_priority = 0,
-               lower_priority,
-               low_priority,
-               normal_priority,
-               high_priority,
-               higher_priority
-       };
+       lowest_priority = 0,
+       lower_priority,
+       low_priority,
+       normal_priority,
+       high_priority,
+       higher_priority
 };
-typedef enum_class<task_priority_def> task_priority;
-
-class executor sealed
-{      
-       struct priority_function
-       {
-               int                                             priority;
-               std::function<void()>   func;
-
-               priority_function()
-               {
-               }
-
-               template<typename F>
-               priority_function(int priority, F&& func)
-                       : priority(priority)
-                       , func(std::forward<F>(func))
-               {
-               }
-
-               void operator()()
-               {
-                       func();
-               }
-
-               bool operator<(const priority_function& other) const
-               {
-                       return priority < other.priority;
-               }
-       };
 
+class executor final
+{
        executor(const executor&);
        executor& operator=(const executor&);
-       
-       typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>>       function_queue_t;
-       
+
+       typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
+
        const std::wstring                                                                                      name_;
        tbb::atomic<bool>                                                                                       is_running_;
-       boost::thread                                                                                           thread_;        
+       boost::thread                                                                                           thread_;
        function_queue_t                                                                                        execution_queue_;
-               
-public:                
+       tbb::atomic<bool>                                                                                       currently_in_task_;
+
+public:
        executor(const std::wstring& name)
                : name_(name)
-               , execution_queue_(512)
+               , execution_queue_(std::numeric_limits<int>::max(), std::vector<task_priority> {
+                       task_priority::lowest_priority,
+                       task_priority::lower_priority,
+                       task_priority::low_priority,
+                       task_priority::normal_priority,
+                       task_priority::high_priority,
+                       task_priority::higher_priority
+               })
        {
                is_running_ = true;
+               currently_in_task_ = false;
                thread_ = boost::thread([this]{run();});
        }
-       
+
        ~executor()
        {
+               CASPAR_LOG(debug) << L"Shutting down " << name_;
+
                try
                {
-                       internal_begin_invoke([=]
-                       {
-                               is_running_ = false;
-                       }, false).wait();
+                       if (is_running_)
+                               internal_begin_invoke([=]
+                               {
+                                       is_running_ = false;
+                               }).wait();
                }
                catch(...)
                {
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
-               
-               thread_.join();
+
+               join();
        }
-       
-       template<typename Func>
-       auto try_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())>
-       {       
-               if(!is_running_)
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
 
-               // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
-               return internal_begin_invoke(std::forward<Func>(func), true, priority);
+       void join()
+       {
+               thread_.join();
        }
 
        template<typename Func>
-       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
-       {       
+       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
+       {
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
-                               
-               return internal_begin_invoke(std::forward<Func>(func), false, priority);        
+
+               return internal_begin_invoke(std::forward<Func>(func), priority);
        }
-       
+
        template<typename Func>
        auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
        {
                if(is_current())  // Avoids potential deadlock.
                        return func();
-               
+
                return begin_invoke(std::forward<Func>(func), prioriy).get();
        }
 
-       void yield()
+       void yield(task_priority minimum_priority)
        {
                if(!is_current())
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
 
-               priority_function func;
-               if(execution_queue_.try_pop(func))
+               std::function<void ()> func;
+
+               while (execution_queue_.try_pop(func, minimum_priority))
                        func();
        }
 
@@ -162,13 +141,18 @@ public:
        {
                return execution_queue_.capacity();
        }
-       
+
+       bool is_full() const
+       {
+               return execution_queue_.space_available() == 0;
+       }
+
        void clear()
-       {               
-               priority_function func;
+       {
+               std::function<void ()> func;
                while(execution_queue_.try_pop(func));
        }
-                               
+
        void stop()
        {
                invoke([this]
@@ -181,40 +165,49 @@ public:
        {
                invoke([]{}, task_priority::lowest_priority);
        }
-               
-       function_queue_t::size_type size() const 
+
+       function_queue_t::size_type size() const
        {
-               return execution_queue_.size(); 
+               return execution_queue_.size();
        }
-               
+
        bool is_running() const
        {
-               return is_running_; 
-       }       
+               return is_running_;
+       }
 
        bool is_current() const
        {
                return boost::this_thread::get_id() == thread_.get_id();
        }
-               
-private:       
+
+       bool is_currently_in_task() const
+       {
+               return currently_in_task_;
+       }
+
+       std::wstring name() const
+       {
+               return name_;
+       }
+
+private:
 
        std::wstring print() const
        {
                return L"executor[" + name_ + L"]";
        }
-       
+
        template<typename Func>
        auto internal_begin_invoke(
                Func&& func,
-               bool try_begin,
-               task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
-       {                                       
+               task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
+       {
                typedef typename std::remove_reference<Func>::type      function_type;
                typedef decltype(func())                                                        result_type;
-               typedef boost::packaged_task<result_type>                       task_type;
-                                                               
-               std::unique_ptr<task_type> task;
+               typedef std::packaged_task<result_type()>                       task_type;
+
+               std::shared_ptr<task_type> task;
 
                // Use pointers since the boost thread library doesn't fully support move semantics.
 
@@ -232,65 +225,73 @@ private:
                        delete raw_func2;
                        throw;
                }
-               
-               task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
-               {
-                       try
-                       {
-                               if(is_current())  // Avoids potential deadlock.
-                                       my_task();
-                       }
-                       catch(boost::task_already_started&){}
-               }));
-                               
-               auto future = task->get_future();
 
-               auto raw_task = task.release();
-               priority_function prio_func(priority.value(), [raw_task]
+               auto future = task->get_future().share();
+               auto function = [task]
                {
-                       std::unique_ptr<task_type> task(raw_task);
                        try
                        {
                                (*task)();
                        }
-                       catch(boost::task_already_started&){}
-               });
+                       catch(std::future_error&){}
+               };
 
-               if (!execution_queue_.try_push(prio_func))
+               if (!execution_queue_.try_push(priority, function))
                {
-                       if (try_begin)
-                       {
-                               delete raw_task;
+                       if (is_current())
+                               CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock."));
 
-                               return boost::unique_future<decltype(func())>();
+                       CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
+                       execution_queue_.push(priority, function);
+               }
+
+               return std::async(std::launch::deferred, [=]() mutable -> result_type
+               {
+                       if (!is_ready(future) && is_current()) // Avoids potential deadlock.
+                       {
+                               function();
                        }
-                       else
+
+                       try
                        {
-                               CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
-                               execution_queue_.push(prio_func);
+                               return future.get();
                        }
-               }
+                       catch (const caspar_exception& e)
+                       {
+                               if (!is_current()) // Add context information from this thread before rethrowing.
+                               {
+                                       auto ctx_info = boost::get_error_info<context_info_t>(e);
 
-               return std::move(future);               
+                                       if (ctx_info)
+                                               e << context_info(get_context() + *ctx_info);
+                                       else
+                                               e << context_info(get_context());
+                               }
+
+                               throw;
+                       }
+               });
        }
 
        void run() // noexcept
        {
-               win32_exception::install_handler();             
+               ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
                while(is_running_)
                {
                        try
                        {
-                               priority_function func;
+                               std::function<void ()> func;
                                execution_queue_.pop(func);
+                               currently_in_task_ = true;
                                func();
                        }
                        catch(...)
                        {
                                CASPAR_LOG_CURRENT_EXCEPTION();
                        }
+
+                       currently_in_task_ = false;
                }
-       }       
+       }
 };
-
-}
\ No newline at end of file
+}