]> git.sesse.net Git - casparcg/blobdiff - common/executor.h
[executor] changed default to unbounded like in 2.0.7 and fixed a deadlock when capac...
[casparcg] / common / executor.h
index 79fa200288d264d7e7fd1c3c4ff1b159eab42148..d8a34d1373880164c69bb0dd9a94e3eb40481588 100644 (file)
-/*\r
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG (www.casparcg.com).\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
-* GNU General Public License for more details.\r
-*\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-* Author: Robert Nagy, ronag89@gmail.com\r
-*/\r
-\r
-#pragma once\r
-\r
-#include "except.h"\r
-#include "enum_class.h"\r
-#include "log.h"\r
-\r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_priority_queue.h>\r
-#include <tbb/concurrent_queue.h>\r
-\r
-#include <boost/thread.hpp>\r
-\r
-#include <functional>\r
-\r
-namespace caspar {\r
-               \r
-struct task_priority_def\r
-{\r
-       enum type\r
-       {\r
-               lowest_priority = 0,\r
-               lower_priority,\r
-               low_priority,\r
-               normal_priority,\r
-               high_priority,\r
-               higher_priority\r
-       };\r
-};\r
-typedef enum_class<task_priority_def> task_priority;\r
-\r
-class executor sealed\r
-{      \r
-       struct priority_function\r
-       {\r
-               int                                             priority;\r
-               std::function<void()>   func;\r
-\r
-               priority_function()\r
-               {\r
-               }\r
-\r
-               template<typename F>\r
-               priority_function(int priority, F&& func)\r
-                       : priority(priority)\r
-                       , func(std::forward<F>(func))\r
-               {\r
-               }\r
-\r
-               void operator()()\r
-               {\r
-                       func();\r
-               }\r
-\r
-               bool operator<(const priority_function& other) const\r
-               {\r
-                       return priority < other.priority;\r
-               }\r
-       };\r
-\r
-       executor(const executor&);\r
-       executor& operator=(const executor&);\r
-       \r
-       typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;\r
-       \r
-       const std::wstring                                                                                      name_;\r
-       tbb::atomic<bool>                                                                                       is_running_;\r
-       boost::thread                                                                                           thread_;        \r
-       function_queue_t                                                                                        execution_queue_;\r
-       tbb::concurrent_bounded_queue<int>                                                      semaphore_;\r
-               \r
-public:                \r
-       executor(const std::wstring& name)\r
-               : name_(name)\r
-       {\r
-               is_running_ = true;\r
-               thread_ = boost::thread([this]{run();});\r
-       }\r
-       \r
-       ~executor()\r
-       {\r
-               try\r
-               {\r
-                       internal_begin_invoke([=]\r
-                       {\r
-                               is_running_ = false;\r
-                       }).wait();\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-\r
-                       clear();\r
-                       is_running_ = false;\r
-                       semaphore_.try_push(0);\r
-               }\r
-               thread_.join();\r
-       }\r
-                                               \r
-       template<typename Func>\r
-       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
-       {       \r
-               if(execution_queue_.size() > 128)\r
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor overflow.") << source_info(name_));\r
-\r
-               if(!is_running_)\r
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));\r
-                               \r
-               return internal_begin_invoke(std::forward<Func>(func), priority);       \r
-       }\r
-       \r
-       template<typename Func>\r
-       auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept\r
-       {\r
-               if(is_current())  // Avoids potential deadlock.\r
-                       return func();\r
-               \r
-               return begin_invoke(std::forward<Func>(func), prioriy).get();\r
-       }\r
-\r
-       void yield()\r
-       {\r
-               if(!is_current())\r
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));\r
-\r
-               int dummy;\r
-               if(!semaphore_.try_pop(dummy))\r
-                       return;\r
-\r
-               priority_function func;\r
-               if(execution_queue_.try_pop(func))\r
-                       func();\r
-       }\r
-\r
-       void set_capacity(std::size_t capacity)\r
-       {\r
-               semaphore_.set_capacity(capacity);\r
-       }\r
-\r
-       std::size_t capacity() const\r
-       {\r
-               return semaphore_.capacity();\r
-       }\r
-       \r
-       void clear()\r
-       {               \r
-               priority_function func;\r
-               while(execution_queue_.try_pop(func));\r
-       }\r
-                               \r
-       void stop()\r
-       {\r
-               invoke([this]\r
-               {\r
-                       is_running_ = false;\r
-               });\r
-       }\r
-\r
-       void wait()\r
-       {\r
-               invoke([]{}, task_priority::lowest_priority);\r
-       }\r
-               \r
-       function_queue_t::size_type size() const \r
-       {\r
-               return execution_queue_.size(); \r
-       }\r
-               \r
-       bool is_running() const\r
-       {\r
-               return is_running_; \r
-       }       \r
-\r
-       bool is_current() const\r
-       {\r
-               return boost::this_thread::get_id() == thread_.get_id();\r
-       }\r
-               \r
-private:       \r
-\r
-       template<typename Func>\r
-       auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
-       {                                       \r
-               typedef typename std::remove_reference<Func>::type      function_type;\r
-               typedef decltype(func())                                                        result_type;\r
-               typedef boost::packaged_task<result_type>                       task_type;\r
-                                                               \r
-               std::unique_ptr<task_type> task;\r
-\r
-               // Use pointers since the boost thread library doesn't fully support move semantics.\r
-\r
-               auto raw_func2 = new function_type(std::forward<Func>(func));\r
-               try\r
-               {\r
-                       task.reset(new task_type([raw_func2]() -> result_type\r
-                       {\r
-                               std::unique_ptr<function_type> func2(raw_func2);\r
-                               return (*func2)();\r
-                       }));\r
-               }\r
-               catch(...)\r
-               {\r
-                       delete raw_func2;\r
-                       throw;\r
-               }\r
-               \r
-               task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
-               {\r
-                       try\r
-                       {\r
-                               if(is_current())  // Avoids potential deadlock.\r
-                                       my_task();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               }));\r
-                               \r
-               auto future = task->get_future();\r
-\r
-               auto raw_task = task.release();\r
-               priority_function prio_func(priority.value(), [raw_task]\r
-               {\r
-                       std::unique_ptr<task_type> task(raw_task);\r
-                       try\r
-                       {\r
-                               (*task)();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               });\r
-\r
-               execution_queue_.push(prio_func);\r
-               semaphore_.push(0);\r
-                                                       \r
-               return std::move(future);               \r
-       }\r
-\r
-       void run() // noexcept\r
-       {\r
-               win32_exception::install_handler();             \r
-               while(is_running_)\r
-               {\r
-                       try\r
-                       {\r
-                               int dummy;\r
-                               semaphore_.pop(dummy);\r
-\r
-                               priority_function func;\r
-                               if(execution_queue_.try_pop(func))\r
-                                       func();\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       }\r
-               }\r
-       }       \r
-};\r
-\r
-}
\ No newline at end of file
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include "os/general_protection_fault.h"
+#include "except.h"
+#include "log.h"
+#include "blocking_bounded_queue_adapter.h"
+#include "blocking_priority_queue.h"
+#include "future.h"
+
+#include <tbb/atomic.h>
+#include <tbb/concurrent_priority_queue.h>
+
+#include <boost/thread.hpp>
+#include <boost/optional.hpp>
+
+#include <functional>
+#include <future>
+
+namespace caspar {
+enum class task_priority
+{
+       lowest_priority = 0,
+       lower_priority,
+       low_priority,
+       normal_priority,
+       high_priority,
+       higher_priority
+};
+
+class executor final
+{
+       executor(const executor&);
+       executor& operator=(const executor&);
+
+       typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
+
+       const std::wstring                                                                                      name_;
+       tbb::atomic<bool>                                                                                       is_running_;
+       boost::thread                                                                                           thread_;
+       function_queue_t                                                                                        execution_queue_;
+       tbb::atomic<bool>                                                                                       currently_in_task_;
+
+public:
+       executor(const std::wstring& name)
+               : name_(name)
+               , execution_queue_(std::numeric_limits<int>::max(), std::vector<task_priority> {
+                       task_priority::lowest_priority,
+                       task_priority::lower_priority,
+                       task_priority::low_priority,
+                       task_priority::normal_priority,
+                       task_priority::high_priority,
+                       task_priority::higher_priority
+               })
+       {
+               is_running_ = true;
+               currently_in_task_ = false;
+               thread_ = boost::thread([this]{run();});
+       }
+
+       ~executor()
+       {
+               CASPAR_LOG(debug) << L"Shutting down " << name_;
+
+               try
+               {
+                       if (is_running_)
+                               internal_begin_invoke([=]
+                               {
+                                       is_running_ = false;
+                               }).wait();
+               }
+               catch(...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+               }
+
+               join();
+       }
+
+       void join()
+       {
+               thread_.join();
+       }
+
+       template<typename Func>
+       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
+       {
+               if(!is_running_)
+                       CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
+
+               return internal_begin_invoke(std::forward<Func>(func), priority);
+       }
+
+       template<typename Func>
+       auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
+       {
+               if(is_current())  // Avoids potential deadlock.
+                       return func();
+
+               return begin_invoke(std::forward<Func>(func), prioriy).get();
+       }
+
+       void yield(task_priority minimum_priority)
+       {
+               if(!is_current())
+                       CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
+
+               std::function<void ()> func;
+
+               while (execution_queue_.try_pop(func, minimum_priority))
+                       func();
+       }
+
+       void set_capacity(function_queue_t::size_type capacity)
+       {
+               execution_queue_.set_capacity(capacity);
+       }
+
+       function_queue_t::size_type capacity() const
+       {
+               return execution_queue_.capacity();
+       }
+
+       bool is_full() const
+       {
+               return execution_queue_.space_available() == 0;
+       }
+
+       void clear()
+       {
+               std::function<void ()> func;
+               while(execution_queue_.try_pop(func));
+       }
+
+       void stop()
+       {
+               invoke([this]
+               {
+                       is_running_ = false;
+               });
+       }
+
+       void wait()
+       {
+               invoke([]{}, task_priority::lowest_priority);
+       }
+
+       function_queue_t::size_type size() const
+       {
+               return execution_queue_.size();
+       }
+
+       bool is_running() const
+       {
+               return is_running_;
+       }
+
+       bool is_current() const
+       {
+               return boost::this_thread::get_id() == thread_.get_id();
+       }
+
+       bool is_currently_in_task() const
+       {
+               return currently_in_task_;
+       }
+
+       std::wstring name() const
+       {
+               return name_;
+       }
+
+private:
+
+       std::wstring print() const
+       {
+               return L"executor[" + name_ + L"]";
+       }
+
+       template<typename Func>
+       auto internal_begin_invoke(
+               Func&& func,
+               task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
+       {
+               typedef typename std::remove_reference<Func>::type      function_type;
+               typedef decltype(func())                                                        result_type;
+               typedef std::packaged_task<result_type()>                       task_type;
+
+               std::shared_ptr<task_type> task;
+
+               // Use pointers since the boost thread library doesn't fully support move semantics.
+
+               auto raw_func2 = new function_type(std::forward<Func>(func));
+               try
+               {
+                       task.reset(new task_type([raw_func2]() -> result_type
+                       {
+                               std::unique_ptr<function_type> func2(raw_func2);
+                               return (*func2)();
+                       }));
+               }
+               catch(...)
+               {
+                       delete raw_func2;
+                       throw;
+               }
+
+               auto future = task->get_future().share();
+               auto function = [task]
+               {
+                       try
+                       {
+                               (*task)();
+                       }
+                       catch(std::future_error&){}
+               };
+
+               if (!execution_queue_.try_push(priority, function))
+               {
+                       if (is_current())
+                               CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Overflow. Avoiding deadlock."));
+
+                       CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
+                       execution_queue_.push(priority, function);
+               }
+
+               return std::async(std::launch::deferred, [=]() mutable -> result_type
+               {
+                       if (!is_ready(future) && is_current()) // Avoids potential deadlock.
+                       {
+                               function();
+                       }
+
+                       try
+                       {
+                               return future.get();
+                       }
+                       catch (const caspar_exception& e)
+                       {
+                               if (!is_current()) // Add context information from this thread before rethrowing.
+                               {
+                                       auto ctx_info = boost::get_error_info<context_info_t>(e);
+
+                                       if (ctx_info)
+                                               e << context_info(get_context() + *ctx_info);
+                                       else
+                                               e << context_info(get_context());
+                               }
+
+                               throw;
+                       }
+               });
+       }
+
+       void run() // noexcept
+       {
+               ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
+               while(is_running_)
+               {
+                       try
+                       {
+                               std::function<void ()> func;
+                               execution_queue_.pop(func);
+                               currently_in_task_ = true;
+                               func();
+                       }
+                       catch(...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                       }
+
+                       currently_in_task_ = false;
+               }
+       }
+};
+}