]> git.sesse.net Git - casparcg/blobdiff - common/executor.h
Merged INFO THREADS from 2.0
[casparcg] / common / executor.h
index 0e4c021e7c87b89f38a9e55518a2c2d78db2bbfb..59e12ba905e446e3caab34a4d84a771644ef1121 100644 (file)
 
 #pragma once
 
+#include "os/general_protection_fault.h"
 #include "except.h"
-#include "enum_class.h"
 #include "log.h"
 #include "blocking_bounded_queue_adapter.h"
 #include "blocking_priority_queue.h"
-
+#include "future.h"
 
 #include <tbb/atomic.h>
 #include <tbb/concurrent_priority_queue.h>
 
 #include <boost/thread.hpp>
 #include <boost/optional.hpp>
-#include <boost/assign/list_of.hpp>
 
 #include <functional>
+#include <future>
 
 namespace caspar {
                
-struct task_priority_def
+enum class task_priority
 {
-       enum type
-       {
-               lowest_priority = 0,
-               lower_priority,
-               low_priority,
-               normal_priority,
-               high_priority,
-               higher_priority
-       };
+       lowest_priority = 0,
+       lower_priority,
+       low_priority,
+       normal_priority,
+       high_priority,
+       higher_priority
 };
-typedef enum_class<task_priority_def> task_priority;
 
-class executor sealed
+class executor final
 {      
        executor(const executor&);
        executor& operator=(const executor&);
@@ -68,13 +64,14 @@ class executor sealed
 public:                
        executor(const std::wstring& name)
                : name_(name)
-               , execution_queue_(512, boost::assign::list_of
-                               (task_priority::lowest_priority)
-                               (task_priority::lower_priority)
-                               (task_priority::low_priority)
-                               (task_priority::normal_priority)
-                               (task_priority::high_priority)
-                               (task_priority::higher_priority))
+               , execution_queue_(512, std::vector<task_priority> {
+                       task_priority::lowest_priority,
+                       task_priority::lower_priority,
+                       task_priority::low_priority,
+                       task_priority::normal_priority,
+                       task_priority::high_priority,
+                       task_priority::higher_priority 
+               })
        {
                is_running_ = true;
                thread_ = boost::thread([this]{run();});
@@ -94,11 +91,16 @@ public:
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
                
+               join();
+       }
+
+       void join()
+       {
                thread_.join();
        }
 
        template<typename Func>
-       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
        {       
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
@@ -135,6 +137,11 @@ public:
        {
                return execution_queue_.capacity();
        }
+
+       bool is_full() const
+       {
+               return execution_queue_.space_available() == 0;
+       }
        
        void clear()
        {               
@@ -169,6 +176,11 @@ public:
        {
                return boost::this_thread::get_id() == thread_.get_id();
        }
+
+       std::wstring name() const
+       {
+               return name_;
+       }
                
 private:       
 
@@ -180,13 +192,13 @@ private:
        template<typename Func>
        auto internal_begin_invoke(
                Func&& func,
-               task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+               task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
        {                                       
                typedef typename std::remove_reference<Func>::type      function_type;
                typedef decltype(func())                                                        result_type;
-               typedef boost::packaged_task<result_type>                       task_type;
+               typedef std::packaged_task<result_type()>                       task_type;
                                                                
-               std::unique_ptr<task_type> task;
+               std::shared_ptr<task_type> task;
 
                // Use pointers since the boost thread library doesn't fully support move semantics.
 
@@ -204,28 +216,15 @@ private:
                        delete raw_func2;
                        throw;
                }
-               
-               task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
-               {
-                       try
-                       {
-                               if(is_current())  // Avoids potential deadlock.
-                                       my_task();
-                       }
-                       catch(boost::task_already_started&){}
-               }));
                                
-               auto future = task->get_future();
-
-               auto raw_task = task.release();
-               auto function = [raw_task]
+               auto future = task->get_future().share();
+               auto function = [task]
                {
-                       std::unique_ptr<task_type> task(raw_task);
                        try
                        {
                                (*task)();
                        }
-                       catch(boost::task_already_started&){}
+                       catch(std::future_error&){}
                };
 
                if (!execution_queue_.try_push(priority, function))
@@ -234,12 +233,20 @@ private:
                        execution_queue_.push(priority, function);
                }
 
-               return std::move(future);               
+               return std::async(std::launch::deferred, [=]() mutable -> result_type
+               {
+                       if (!is_ready(future) && is_current()) // Avoids potential deadlock.
+                       {
+                               function();
+                       }
+
+                       return future.get();
+               });
        }
 
        void run() // noexcept
        {
-               win32_exception::install_handler();             
+               ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
                while(is_running_)
                {
                        try
@@ -256,4 +263,4 @@ private:
        }       
 };
 
-}
\ No newline at end of file
+}