]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2: Added copyright notice to all files.
[casparcg] / common / concurrency / executor.h
1 /*\r
2 * copyright (c) 2010 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 *  This file is part of CasparCG.\r
5 *\r
6 *    CasparCG is free software: you can redistribute it and/or modify\r
7 *    it under the terms of the GNU General Public License as published by\r
8 *    the Free Software Foundation, either version 3 of the License, or\r
9 *    (at your option) any later version.\r
10 *\r
11 *    CasparCG is distributed in the hope that it will be useful,\r
12 *    but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 *    GNU General Public License for more details.\r
15 \r
16 *    You should have received a copy of the GNU General Public License\r
17 *    along with CasparCG.  If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 */\r
20 #pragma once\r
21 \r
22 #include "../exception/win32_exception.h"\r
23 #include "../utility/assert.h"\r
24 #include "../log/log.h"\r
25 \r
26 #include <tbb/atomic.h>\r
27 #include <tbb/concurrent_queue.h>\r
28 \r
29 #include <boost/thread.hpp>\r
30 #include <boost/noncopyable.hpp>\r
31 \r
32 #include <functional>\r
33 \r
34 namespace caspar {\r
35 \r
36 namespace detail {\r
37 \r
38 typedef struct tagTHREADNAME_INFO\r
39 {\r
40         DWORD dwType; // must be 0x1000\r
41         LPCSTR szName; // pointer to name (in user addr space)\r
42         DWORD dwThreadID; // thread ID (-1=caller thread)\r
43         DWORD dwFlags; // reserved for future use, must be zero\r
44 } THREADNAME_INFO;\r
45 \r
46 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)\r
47 {\r
48         THREADNAME_INFO info;\r
49         {\r
50                 info.dwType = 0x1000;\r
51                 info.szName = szThreadName;\r
52                 info.dwThreadID = dwThreadID;\r
53                 info.dwFlags = 0;\r
54         }\r
55         __try\r
56         {\r
57                 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );\r
58         }\r
59         __except (EXCEPTION_CONTINUE_EXECUTION){}       \r
60 }\r
61 \r
62 }\r
63 \r
64 class executor : boost::noncopyable\r
65 {\r
66         const std::string name_;\r
67         boost::thread thread_;\r
68         tbb::atomic<bool> is_running_;\r
69         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
70 public:\r
71                 \r
72         explicit executor(const std::wstring& name, bool auto_start = false) : name_(narrow(name))\r
73         {\r
74                 is_running_ = false;\r
75                 if(auto_start)\r
76                         start();\r
77         }\r
78         \r
79         virtual ~executor()\r
80         {\r
81                 stop();\r
82                 clear();\r
83                 if(boost::this_thread::get_id() != thread_.get_id())\r
84                         thread_.join();\r
85         }\r
86 \r
87         void set_capacity(size_t capacity)\r
88         {\r
89                 execution_queue_.set_capacity(capacity);\r
90         }\r
91 \r
92         void start() // noexcept\r
93         {\r
94                 if(is_running_.fetch_and_store(true))\r
95                         return;\r
96                 clear();\r
97                 thread_ = boost::thread([this]{run();});\r
98         }\r
99                         \r
100         void stop() // noexcept\r
101         {\r
102                 is_running_ = false;    \r
103                 execution_queue_.try_push([]{});\r
104         }\r
105         \r
106         void clear()\r
107         {\r
108                 std::function<void()> func;\r
109                 auto size = execution_queue_.size();\r
110                 for(int n = 0; n < size; ++n)\r
111                 {\r
112                         try\r
113                         {\r
114                                 if(!execution_queue_.try_pop(func))\r
115                                         return;\r
116                         }\r
117                         catch(boost::broken_promise&){}\r
118                 }\r
119         }\r
120                         \r
121         template<typename Func>\r
122         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
123         {       \r
124                 typedef boost::packaged_task<decltype(func())> task_type;\r
125                                 \r
126                 auto task = task_type(std::forward<Func>(func));\r
127                 auto future = task.get_future();\r
128                 \r
129                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
130                 {\r
131                         try\r
132                         {\r
133                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
134                                         my_task();\r
135                         }\r
136                         catch(boost::task_already_started&){}\r
137                 }));\r
138                                 \r
139                 struct task_adaptor_t\r
140                 {\r
141                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
142                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
143                         void operator()() const { task(); }\r
144                         mutable task_type task;\r
145                 } task_adaptor(std::move(task));\r
146 \r
147                 execution_queue_.push([=]\r
148                 {\r
149                         try{task_adaptor();}\r
150                         catch(boost::task_already_started&){}\r
151                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
152                 });\r
153                                         \r
154                 return std::move(future);               \r
155         }\r
156         \r
157         template<typename Func>\r
158         auto invoke(Func&& func) -> decltype(func())\r
159         {\r
160                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
161                         return func();\r
162                 \r
163                 return begin_invoke(std::forward<Func>(func)).get();\r
164         }\r
165         \r
166         tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity();  }\r
167         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
168         bool empty() const              { return execution_queue_.empty();      }\r
169         bool is_running() const { return is_running_;                           }       \r
170                 \r
171 private:\r
172         \r
173         void execute() // noexcept\r
174         {\r
175                 std::function<void()> func;\r
176                 execution_queue_.pop(func);     \r
177                 func();\r
178         }\r
179 \r
180         void run() // noexcept\r
181         {\r
182                 win32_exception::install_handler();             \r
183                 detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
184                 while(is_running_)\r
185                         execute();\r
186         }       \r
187 };\r
188 \r
189 }