-/*\r
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG (www.casparcg.com).\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\r
-* GNU General Public License for more details.\r
-*\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-* Author: Robert Nagy, ronag89@gmail.com\r
-*/\r
-\r
-#pragma once\r
-\r
-#include "except.h"\r
-#include "enum_class.h"\r
-#include "log.h"\r
-\r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_priority_queue.h>\r
-#include <tbb/concurrent_queue.h>\r
-\r
-#include <boost/thread.hpp>\r
-\r
-#include <functional>\r
-\r
-namespace caspar {\r
- \r
-struct task_priority_def\r
-{\r
- enum type\r
- {\r
- lowest_priority = 0,\r
- lower_priority,\r
- low_priority,\r
- normal_priority,\r
- high_priority,\r
- higher_priority\r
- };\r
-};\r
-typedef enum_class<task_priority_def> task_priority;\r
-\r
-class executor sealed\r
-{ \r
- struct priority_function\r
- {\r
- int priority;\r
- std::function<void()> func;\r
-\r
- priority_function()\r
- {\r
- }\r
-\r
- template<typename F>\r
- priority_function(int priority, F&& func)\r
- : priority(priority)\r
- , func(std::forward<F>(func))\r
- {\r
- }\r
-\r
- void operator()()\r
- {\r
- func();\r
- }\r
-\r
- bool operator<(const priority_function& other) const\r
- {\r
- return priority < other.priority;\r
- }\r
- };\r
-\r
- executor(const executor&);\r
- executor& operator=(const executor&);\r
- \r
- typedef tbb::concurrent_priority_queue<priority_function> function_queue_t;\r
- \r
- const std::wstring name_;\r
- tbb::atomic<bool> is_running_;\r
- boost::thread thread_; \r
- function_queue_t execution_queue_;\r
- tbb::concurrent_bounded_queue<int> semaphore_;\r
- \r
-public: \r
- executor(const std::wstring& name)\r
- : name_(name)\r
- {\r
- is_running_ = true;\r
- thread_ = boost::thread([this]{run();});\r
- }\r
- \r
- ~executor()\r
- {\r
- try\r
- {\r
- internal_begin_invoke([=]\r
- {\r
- is_running_ = false;\r
- }).wait();\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
- \r
- thread_.join();\r
- }\r
- \r
- template<typename Func>\r
- auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
- { \r
- if(execution_queue_.size() > 128)\r
- CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor overflow.") << source_info(name_));\r
-\r
- if(!is_running_)\r
- CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));\r
- \r
- return internal_begin_invoke(std::forward<Func>(func), priority); \r
- }\r
- \r
- template<typename Func>\r
- auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept\r
- {\r
- if(is_current()) // Avoids potential deadlock.\r
- return func();\r
- \r
- return begin_invoke(std::forward<Func>(func), prioriy).get();\r
- }\r
-\r
- void yield()\r
- {\r
- if(!is_current())\r
- CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));\r
-\r
- int dummy;\r
- if(!semaphore_.try_pop(dummy))\r
- return;\r
-\r
- priority_function func;\r
- if(execution_queue_.try_pop(func))\r
- func();\r
- }\r
-\r
- void set_capacity(std::size_t capacity)\r
- {\r
- semaphore_.set_capacity(capacity);\r
- }\r
-\r
- std::size_t capacity() const\r
- {\r
- return semaphore_.capacity();\r
- }\r
- \r
- void clear()\r
- { \r
- priority_function func;\r
- while(execution_queue_.try_pop(func));\r
- }\r
- \r
- void stop()\r
- {\r
- invoke([this]\r
- {\r
- is_running_ = false;\r
- });\r
- }\r
-\r
- void wait()\r
- {\r
- invoke([]{}, task_priority::lowest_priority);\r
- }\r
- \r
- function_queue_t::size_type size() const \r
- {\r
- return execution_queue_.size(); \r
- }\r
- \r
- bool is_running() const\r
- {\r
- return is_running_; \r
- } \r
-\r
- bool is_current() const\r
- {\r
- return boost::this_thread::get_id() == thread_.get_id();\r
- }\r
- \r
-private: \r
- \r
- template<typename Func>\r
- auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
- { \r
- typedef typename std::remove_reference<Func>::type function_type;\r
- typedef decltype(func()) result_type;\r
- typedef boost::packaged_task<result_type> task_type;\r
- \r
- std::unique_ptr<task_type> task;\r
-\r
- // Use pointers since the boost thread library doesn't fully support move semantics.\r
-\r
- auto raw_func2 = new function_type(std::forward<Func>(func));\r
- try\r
- {\r
- task.reset(new task_type([raw_func2]() -> result_type\r
- {\r
- std::unique_ptr<function_type> func2(raw_func2);\r
- return (*func2)();\r
- }));\r
- }\r
- catch(...)\r
- {\r
- delete raw_func2;\r
- throw;\r
- }\r
- \r
- task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
- {\r
- try\r
- {\r
- if(is_current()) // Avoids potential deadlock.\r
- my_task();\r
- }\r
- catch(boost::task_already_started&){}\r
- }));\r
- \r
- auto future = task->get_future();\r
-\r
- auto raw_task = task.release();\r
- priority_function prio_func(priority.value(), [raw_task]\r
- {\r
- std::unique_ptr<task_type> task(raw_task);\r
- try\r
- {\r
- (*task)();\r
- }\r
- catch(boost::task_already_started&){}\r
- });\r
-\r
- execution_queue_.push(prio_func);\r
- semaphore_.push(0);\r
- \r
- return std::move(future); \r
- }\r
-\r
- void run() // noexcept\r
- {\r
- win32_exception::install_handler(); \r
- while(is_running_)\r
- {\r
- try\r
- {\r
- int dummy;\r
- semaphore_.pop(dummy);\r
-\r
- priority_function func;\r
- if(execution_queue_.try_pop(func))\r
- func();\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
- }\r
- } \r
-};\r
-\r
-}
\ No newline at end of file
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include "os/general_protection_fault.h"
+#include "except.h"
+#include "log.h"
+#include "blocking_bounded_queue_adapter.h"
+#include "blocking_priority_queue.h"
+#include "future.h"
+
+#include <tbb/atomic.h>
+#include <tbb/concurrent_priority_queue.h>
+
+#include <boost/thread.hpp>
+#include <boost/optional.hpp>
+
+#include <functional>
+#include <future>
+
+namespace caspar {
+
+enum class task_priority
+{
+ lowest_priority = 0,
+ lower_priority,
+ low_priority,
+ normal_priority,
+ high_priority,
+ higher_priority
+};
+
+class executor final
+{
+ executor(const executor&);
+ executor& operator=(const executor&);
+
+ typedef blocking_priority_queue<std::function<void()>, task_priority> function_queue_t;
+
+ const std::wstring name_;
+ tbb::atomic<bool> is_running_;
+ boost::thread thread_;
+ function_queue_t execution_queue_;
+ tbb::atomic<bool> currently_in_task_;
+
+public:
+ executor(const std::wstring& name)
+ : name_(name)
+ , execution_queue_(512, std::vector<task_priority> {
+ task_priority::lowest_priority,
+ task_priority::lower_priority,
+ task_priority::low_priority,
+ task_priority::normal_priority,
+ task_priority::high_priority,
+ task_priority::higher_priority
+ })
+ {
+ is_running_ = true;
+ currently_in_task_ = false;
+ thread_ = boost::thread([this]{run();});
+ }
+
+ ~executor()
+ {
+ CASPAR_LOG(debug) << L"Shutting down " << name_;
+
+ try
+ {
+ if (is_running_)
+ internal_begin_invoke([=]
+ {
+ is_running_ = false;
+ }).wait();
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+
+ join();
+ }
+
+ void join()
+ {
+ thread_.join();
+ }
+
+ template<typename Func>
+ auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
+ {
+ if(!is_running_)
+ CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
+
+ return internal_begin_invoke(std::forward<Func>(func), priority);
+ }
+
+ template<typename Func>
+ auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
+ {
+ if(is_current()) // Avoids potential deadlock.
+ return func();
+
+ return begin_invoke(std::forward<Func>(func), prioriy).get();
+ }
+
+ void yield(task_priority minimum_priority)
+ {
+ if(!is_current())
+ CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));
+
+ std::function<void ()> func;
+
+ while (execution_queue_.try_pop(func, minimum_priority))
+ func();
+ }
+
+ void set_capacity(function_queue_t::size_type capacity)
+ {
+ execution_queue_.set_capacity(capacity);
+ }
+
+ function_queue_t::size_type capacity() const
+ {
+ return execution_queue_.capacity();
+ }
+
+ bool is_full() const
+ {
+ return execution_queue_.space_available() == 0;
+ }
+
+ void clear()
+ {
+ std::function<void ()> func;
+ while(execution_queue_.try_pop(func));
+ }
+
+ void stop()
+ {
+ invoke([this]
+ {
+ is_running_ = false;
+ });
+ }
+
+ void wait()
+ {
+ invoke([]{}, task_priority::lowest_priority);
+ }
+
+ function_queue_t::size_type size() const
+ {
+ return execution_queue_.size();
+ }
+
+ bool is_running() const
+ {
+ return is_running_;
+ }
+
+ bool is_current() const
+ {
+ return boost::this_thread::get_id() == thread_.get_id();
+ }
+
+ bool is_currently_in_task() const
+ {
+ return currently_in_task_;
+ }
+
+ std::wstring name() const
+ {
+ return name_;
+ }
+
+private:
+
+ std::wstring print() const
+ {
+ return L"executor[" + name_ + L"]";
+ }
+
+ template<typename Func>
+ auto internal_begin_invoke(
+ Func&& func,
+ task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
+ {
+ typedef typename std::remove_reference<Func>::type function_type;
+ typedef decltype(func()) result_type;
+ typedef std::packaged_task<result_type()> task_type;
+
+ std::shared_ptr<task_type> task;
+
+ // Use pointers since the boost thread library doesn't fully support move semantics.
+
+ auto raw_func2 = new function_type(std::forward<Func>(func));
+ try
+ {
+ task.reset(new task_type([raw_func2]() -> result_type
+ {
+ std::unique_ptr<function_type> func2(raw_func2);
+ return (*func2)();
+ }));
+ }
+ catch(...)
+ {
+ delete raw_func2;
+ throw;
+ }
+
+ auto future = task->get_future().share();
+ auto function = [task]
+ {
+ try
+ {
+ (*task)();
+ }
+ catch(std::future_error&){}
+ };
+
+ if (!execution_queue_.try_push(priority, function))
+ {
+ CASPAR_LOG(warning) << print() << L" Overflow. Blocking caller.";
+ execution_queue_.push(priority, function);
+ }
+
+ return std::async(std::launch::deferred, [=]() mutable -> result_type
+ {
+ if (!is_ready(future) && is_current()) // Avoids potential deadlock.
+ {
+ function();
+ }
+
+ try
+ {
+ return future.get();
+ }
+ catch (const caspar_exception& e)
+ {
+ if (!is_current()) // Add context information from this thread before rethrowing.
+ e << context_info(get_context() + *boost::get_error_info<context_info_t>(e));
+
+ throw;
+ }
+ });
+ }
+
+ void run() // noexcept
+ {
+ ensure_gpf_handler_installed_for_thread(u8(name_).c_str());
+ while(is_running_)
+ {
+ try
+ {
+ std::function<void ()> func;
+ execution_queue_.pop(func);
+ currently_in_task_ = true;
+ func();
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+
+ currently_in_task_ = false;
+ }
+ }
+};
+
+}