]> git.sesse.net Git - casparcg/blobdiff - common/executor.h
set svn:eol-style native on .h and .cpp files
[casparcg] / common / executor.h
index 4b529edbd09b4ece54bb9544d52940b7698511c5..b58339e1f29b704a28c3568eb9b76b10db514a75 100644 (file)
-/*\r
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG (www.casparcg.com).\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
-* GNU General Public License for more details.\r
-*\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-* Author: Robert Nagy, ronag89@gmail.com\r
-*/\r
-\r
-#pragma once\r
-\r
-#include "except.h"\r
-#include "enum_class.h"\r
-#include "log.h"\r
-\r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_priority_queue.h>\r
-#include <tbb/concurrent_queue.h>\r
-\r
-#include <boost/thread.hpp>\r
-\r
-#include <functional>\r
-\r
-namespace caspar {\r
-               \r
-struct task_priority_def\r
-{\r
-       enum type\r
-       {\r
-               lowest_priority = 0,\r
-               lower_priority,\r
-               low_priority,\r
-               normal_priority,\r
-               high_priority,\r
-               higher_priority\r
-       };\r
-};\r
-typedef enum_class<task_priority_def> task_priority;\r
-\r
-class executor sealed\r
-{      \r
-       struct priority_function\r
-       {\r
-               int                                             priority;\r
-               std::function<void()>   func;\r
-\r
-               priority_function()\r
-               {\r
-               }\r
-\r
-               template<typename F>\r
-               priority_function(int priority, F&& func)\r
-                       : priority(priority)\r
-                       , func(std::forward<F>(func))\r
-               {\r
-               }\r
-\r
-               void operator()()\r
-               {\r
-                       func();\r
-               }\r
-\r
-               bool operator<(const priority_function& other) const\r
-               {\r
-                       return priority < other.priority;\r
-               }\r
-       };\r
-\r
-       executor(const executor&);\r
-       executor& operator=(const executor&);\r
-       \r
-       typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;\r
-       \r
-       const std::wstring                                                                                      name_;\r
-       tbb::atomic<bool>                                                                                       is_running_;\r
-       boost::thread                                                                                           thread_;        \r
-       function_queue_t                                                                                        execution_queue_;\r
-       tbb::concurrent_bounded_queue<int>                                                      semaphore_;\r
-               \r
-public:                \r
-       executor(const std::wstring& name)\r
-               : name_(name)\r
-       {\r
-               is_running_ = true;\r
-               set_capacity(512);\r
-               thread_ = boost::thread([this]{run();});\r
-       }\r
-       \r
-       ~executor()\r
-       {\r
-               try\r
-               {\r
-                       internal_begin_invoke([=]\r
-                       {\r
-                               is_running_ = false;\r
-                       }).wait();\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-               }\r
-               \r
-               thread_.join();\r
-       }\r
-                                               \r
-       template<typename Func>\r
-       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
-       {       \r
-               if(!is_running_)\r
-                       CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));\r
-                               \r
-               return internal_begin_invoke(std::forward<Func>(func), priority);       \r
-       }\r
-       \r
-       template<typename Func>\r
-       auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept\r
-       {\r
-               if(is_current())  // Avoids potential deadlock.\r
-                       return func();\r
-               \r
-               return begin_invoke(std::forward<Func>(func), prioriy).get();\r
-       }\r
-\r
-       void yield()\r
-       {\r
-               if(!is_current())\r
-                       CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));\r
-\r
-               int dummy;\r
-               if(!semaphore_.try_pop(dummy))\r
-                       return;\r
-\r
-               priority_function func;\r
-               if(execution_queue_.try_pop(func))\r
-                       func();\r
-       }\r
-\r
-       void set_capacity(std::size_t capacity)\r
-       {\r
-               semaphore_.set_capacity(capacity);\r
-       }\r
-\r
-       std::size_t capacity() const\r
-       {\r
-               return semaphore_.capacity();\r
-       }\r
-       \r
-       void clear()\r
-       {               \r
-               priority_function func;\r
-               while(execution_queue_.try_pop(func));\r
-       }\r
-                               \r
-       void stop()\r
-       {\r
-               invoke([this]\r
-               {\r
-                       is_running_ = false;\r
-               });\r
-       }\r
-\r
-       void wait()\r
-       {\r
-               invoke([]{}, task_priority::lowest_priority);\r
-       }\r
-               \r
-       function_queue_t::size_type size() const \r
-       {\r
-               return execution_queue_.size(); \r
-       }\r
-               \r
-       bool is_running() const\r
-       {\r
-               return is_running_; \r
-       }       \r
-\r
-       bool is_current() const\r
-       {\r
-               return boost::this_thread::get_id() == thread_.get_id();\r
-       }\r
-               \r
-private:       \r
-\r
-       std::wstring print() const\r
-       {\r
-               return L"executor[" + name_ + L"]";\r
-       }\r
-       \r
-       template<typename Func>\r
-       auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
-       {                                       \r
-               typedef typename std::remove_reference<Func>::type      function_type;\r
-               typedef decltype(func())                                                        result_type;\r
-               typedef boost::packaged_task<result_type>                       task_type;\r
-                                                               \r
-               std::unique_ptr<task_type> task;\r
-\r
-               // Use pointers since the boost thread library doesn't fully support move semantics.\r
-\r
-               auto raw_func2 = new function_type(std::forward<Func>(func));\r
-               try\r
-               {\r
-                       task.reset(new task_type([raw_func2]() -> result_type\r
-                       {\r
-                               std::unique_ptr<function_type> func2(raw_func2);\r
-                               return (*func2)();\r
-                       }));\r
-               }\r
-               catch(...)\r
-               {\r
-                       delete raw_func2;\r
-                       throw;\r
-               }\r
-               \r
-               task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
-               {\r
-                       try\r
-                       {\r
-                               if(is_current())  // Avoids potential deadlock.\r
-                                       my_task();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               }));\r
-                               \r
-               auto future = task->get_future();\r
-\r
-               auto raw_task = task.release();\r
-               priority_function prio_func(priority.value(), [raw_task]\r
-               {\r
-                       std::unique_ptr<task_type> task(raw_task);\r
-                       try\r
-                       {\r
-                               (*task)();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               });\r
-\r
-               execution_queue_.push(prio_func);\r
-\r
-               if(!semaphore_.try_push(0))\r
-               {\r
-                       CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";\r
-                       semaphore_.push(0);\r
-               }                                       \r
-               return std::move(future);               \r
-       }\r
-\r
-       void run() // noexcept\r
-       {\r
-               win32_exception::install_handler();             \r
-               while(is_running_)\r
-               {\r
-                       try\r
-                       {\r
-                               int dummy;\r
-                               semaphore_.pop(dummy);\r
-\r
-                               priority_function func;\r
-                               if(execution_queue_.try_pop(func))\r
-                                       func();\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       }\r
-               }\r
-       }       \r
-};\r
-\r
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include "except.h"
+#include "enum_class.h"
+#include "log.h"
+
+#include <tbb/atomic.h>
+#include <tbb/concurrent_priority_queue.h>
+#include <tbb/concurrent_queue.h>
+
+#include <boost/thread.hpp>
+
+#include <functional>
+
+namespace caspar {
+               
+struct task_priority_def
+{
+       enum type
+       {
+               lowest_priority = 0,
+               lower_priority,
+               low_priority,
+               normal_priority,
+               high_priority,
+               higher_priority
+       };
+};
+typedef enum_class<task_priority_def> task_priority;
+
+class executor sealed
+{      
+       struct priority_function
+       {
+               int                                             priority;
+               std::function<void()>   func;
+
+               priority_function()
+               {
+               }
+
+               template<typename F>
+               priority_function(int priority, F&& func)
+                       : priority(priority)
+                       , func(std::forward<F>(func))
+               {
+               }
+
+               void operator()()
+               {
+                       func();
+               }
+
+               bool operator<(const priority_function& other) const
+               {
+                       return priority < other.priority;
+               }
+       };
+
+       executor(const executor&);
+       executor& operator=(const executor&);
+       
+       typedef tbb::concurrent_priority_queue<priority_function>       function_queue_t;
+       
+       const std::wstring                                                                                      name_;
+       tbb::atomic<bool>                                                                                       is_running_;
+       boost::thread                                                                                           thread_;        
+       function_queue_t                                                                                        execution_queue_;
+       tbb::concurrent_bounded_queue<int>                                                      semaphore_;
+               
+public:                
+       executor(const std::wstring& name)
+               : name_(name)
+       {
+               is_running_ = true;
+               set_capacity(512);
+               thread_ = boost::thread([this]{run();});
+       }
+       
+       ~executor()
+       {
+               try
+               {
+                       internal_begin_invoke([=]
+                       {
+                               is_running_ = false;
+                       }).wait();
+               }
+               catch(...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+               }
+               
+               thread_.join();
+       }
+                                               
+       template<typename Func>
+       auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+       {       
+               if(!is_running_)
+                       CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
+                               
+               return internal_begin_invoke(std::forward<Func>(func), priority);       
+       }
+       
+       template<typename Func>
+       auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
+       {
+               if(is_current())  // Avoids potential deadlock.
+                       return func();
+               
+               return begin_invoke(std::forward<Func>(func), prioriy).get();
+       }
+
+       void yield()
+       {
+               if(!is_current())
+                       CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
+
+               int dummy;
+               if(!semaphore_.try_pop(dummy))
+                       return;
+
+               priority_function func;
+               if(execution_queue_.try_pop(func))
+                       func();
+       }
+
+       void set_capacity(std::size_t capacity)
+       {
+               semaphore_.set_capacity(capacity);
+       }
+
+       std::size_t capacity() const
+       {
+               return semaphore_.capacity();
+       }
+       
+       void clear()
+       {               
+               priority_function func;
+               while(execution_queue_.try_pop(func));
+       }
+                               
+       void stop()
+       {
+               invoke([this]
+               {
+                       is_running_ = false;
+               });
+       }
+
+       void wait()
+       {
+               invoke([]{}, task_priority::lowest_priority);
+       }
+               
+       function_queue_t::size_type size() const 
+       {
+               return execution_queue_.size(); 
+       }
+               
+       bool is_running() const
+       {
+               return is_running_; 
+       }       
+
+       bool is_current() const
+       {
+               return boost::this_thread::get_id() == thread_.get_id();
+       }
+               
+private:       
+
+       std::wstring print() const
+       {
+               return L"executor[" + name_ + L"]";
+       }
+       
+       template<typename Func>
+       auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+       {                                       
+               typedef typename std::remove_reference<Func>::type      function_type;
+               typedef decltype(func())                                                        result_type;
+               typedef boost::packaged_task<result_type>                       task_type;
+                                                               
+               std::unique_ptr<task_type> task;
+
+               // Use pointers since the boost thread library doesn't fully support move semantics.
+
+               auto raw_func2 = new function_type(std::forward<Func>(func));
+               try
+               {
+                       task.reset(new task_type([raw_func2]() -> result_type
+                       {
+                               std::unique_ptr<function_type> func2(raw_func2);
+                               return (*func2)();
+                       }));
+               }
+               catch(...)
+               {
+                       delete raw_func2;
+                       throw;
+               }
+               
+               task->set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
+               {
+                       try
+                       {
+                               if(is_current())  // Avoids potential deadlock.
+                                       my_task();
+                       }
+                       catch(boost::task_already_started&){}
+               }));
+                               
+               auto future = task->get_future();
+
+               auto raw_task = task.release();
+               priority_function prio_func(priority.value(), [raw_task]
+               {
+                       std::unique_ptr<task_type> task(raw_task);
+                       try
+                       {
+                               (*task)();
+                       }
+                       catch(boost::task_already_started&){}
+               });
+
+               execution_queue_.push(prio_func);
+
+               if(!semaphore_.try_push(0))
+               {
+                       CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+                       semaphore_.push(0);
+               }                                       
+               return std::move(future);               
+       }
+
+       void run() // noexcept
+       {
+               win32_exception::install_handler();             
+               while(is_running_)
+               {
+                       try
+                       {
+                               int dummy;
+                               semaphore_.pop(dummy);
+
+                               priority_function func;
+                               if(execution_queue_.try_pop(func))
+                                       func();
+                       }
+                       catch(...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                       }
+               }
+       }       
+};
+
 }
\ No newline at end of file