]> git.sesse.net Git - casparcg/commitdiff
[executor] changed default to unbounded like in 2.0.7 and fixed a deadlock when capac...
authorHelge Norberg <helge.norberg@svt.se>
Thu, 29 Sep 2016 15:55:48 +0000 (17:55 +0200)
committerHelge Norberg <helge.norberg@svt.se>
Thu, 29 Sep 2016 15:55:48 +0000 (17:55 +0200)
common/executor.h

index e67a0afa708e8adb04a458ea2382efe52a158c3a..d8a34d1373880164c69bb0dd9a94e3eb40481588 100644 (file)
@@ -38,7 +38,6 @@
 #include <future>
 
 namespace caspar {
-               
 enum class task_priority
 {
        lowest_priority = 0,
@@ -50,35 +49,35 @@ enum class task_priority
 };
 
 class executor final
-{      
+{
        executor(const executor&);
        executor& operator=(const executor&);
-       
+
        typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
-       
+
        const std::wstring                                                                                      name_;
        tbb::atomic<bool>                                                                                       is_running_;
-       boost::thread                                                                                           thread_;        
+       boost::thread                                                                                           thread_;
        function_queue_t                                                                                        execution_queue_;
        tbb::atomic<bool>                                                                                       currently_in_task_;
 
-public:                
+public:
        executor(const std::wstring& name)
                : name_(name)
-               , execution_queue_(512, std::vector<task_priority> {
+               , execution_queue_(std::numeric_limits<int>::max(), std::vector<task_priority> {
                        task_priority::lowest_priority,
                        task_priority::lower_priority,
                        task_priority::low_priority,
                        task_priority::normal_priority,
                        task_priority::high_priority,
-                       task_priority::higher_priority 
+                       task_priority::higher_priority
                })
        {
                is_running_ = true;
                currently_in_task_ = false;
                thread_ = boost::thread([this]{run();});
        }
-       
+
        ~executor()
        {
                CASPAR_LOG(debug) << L"Shutting down " << name_;
@@ -95,7 +94,7 @@ public:
                {
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
-               
+
                join();
        }
 
@@ -106,19 +105,19 @@ public:
 
        template<typename Func>
        auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
-       {       
+       {
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
-                               
-               return internal_begin_invoke(std::forward<Func>(func), priority);       
+
+               return internal_begin_invoke(std::forward<Func>(func), priority);
        }
-       
+
        template<typename Func>
        auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
        {
                if(is_current())  // Avoids potential deadlock.
                        return func();
-               
+
                return begin_invoke(std::forward<Func>(func), prioriy).get();
        }
 
@@ -147,13 +146,13 @@ public:
        {
                return execution_queue_.space_available() == 0;
        }
-       
+
        void clear()
-       {               
+       {
                std::function<void ()> func;
                while(execution_queue_.try_pop(func));
        }
-                               
+
        void stop()
        {
                invoke([this]
@@ -166,16 +165,16 @@ public:
        {
                invoke([]{}, task_priority::lowest_priority);
        }
-               
-       function_queue_t::size_type size() const 
+
+       function_queue_t::size_type size() const
        {
-               return execution_queue_.size(); 
+               return execution_queue_.size();
        }
 
        bool is_running() const
        {
-               return is_running_; 
-       }       
+               return is_running_;
+       }
 
        bool is_current() const
        {
@@ -191,23 +190,23 @@ public:
        {
                return name_;
        }
-               
-private:       
+
+private:
 
        std::wstring print() const
        {
                return L"executor[" + name_ + L"]";
        }
-       
+
        template<typename Func>
        auto internal_begin_invoke(
                Func&& func,
                task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
-       {                                       
+       {
                typedef typename std::remove_reference<Func>::type      function_type;
                typedef decltype(func())                                                        result_type;
                typedef std::packaged_task<result_type()>                       task_type;
-                                                               
+
                std::shared_ptr<task_type> task;
 
                // Use pointers since the boost thread library doesn't fully support move semantics.
@@ -226,7 +225,7 @@ private:
                        delete raw_func2;
                        throw;
                }
-                               
+
                auto future = task->get_future().share();
                auto function = [task]
                {
@@ -239,6 +238,9 @@ private:
 
                if (!execution_queue_.try_push(priority, function))
                {
+                       if (is_current())
+                               CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock."));
+
                        CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
                        execution_queue_.push(priority, function);
                }
@@ -290,7 +292,6 @@ private:
 
                        currently_in_task_ = false;
                }
-       }       
+       }
 };
-
 }