]> git.sesse.net Git - casparcg/blob - common/executor.h
* Refactored to use enum class instead of enum_class.
[casparcg] / common / executor.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #pragma once
23
24 #include "except.h"
25 #include "log.h"
26 #include "blocking_bounded_queue_adapter.h"
27 #include "blocking_priority_queue.h"
28 #include "future.h"
29
30 #include <tbb/atomic.h>
31 #include <tbb/concurrent_priority_queue.h>
32
33 #include <boost/thread.hpp>
34 #include <boost/optional.hpp>
35
36 #include <functional>
37 #include <future>
38
39 namespace caspar {
40                 
41 enum class task_priority
42 {
43         lowest_priority = 0,
44         lower_priority,
45         low_priority,
46         normal_priority,
47         high_priority,
48         higher_priority
49 };
50
51 class executor final
52 {       
53         executor(const executor&);
54         executor& operator=(const executor&);
55         
56         typedef blocking_priority_queue<std::function<void()>, task_priority>   function_queue_t;
57         
58         const std::wstring                                                                                      name_;
59         tbb::atomic<bool>                                                                                       is_running_;
60         boost::thread                                                                                           thread_;        
61         function_queue_t                                                                                        execution_queue_;
62                 
63 public:         
64         executor(const std::wstring& name)
65                 : name_(name)
66                 , execution_queue_(512, std::vector<task_priority> {
67                         task_priority::lowest_priority,
68                         task_priority::lower_priority,
69                         task_priority::low_priority,
70                         task_priority::normal_priority,
71                         task_priority::high_priority,
72                         task_priority::higher_priority 
73                 })
74         {
75                 is_running_ = true;
76                 thread_ = boost::thread([this]{run();});
77         }
78         
79         ~executor()
80         {
81                 try
82                 {
83                         internal_begin_invoke([=]
84                         {
85                                 is_running_ = false;
86                         }).wait();
87                 }
88                 catch(...)
89                 {
90                         CASPAR_LOG_CURRENT_EXCEPTION();
91                 }
92                 
93                 thread_.join();
94         }
95
96         template<typename Func>
97         auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
98         {       
99                 if(!is_running_)
100                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
101                                 
102                 return internal_begin_invoke(std::forward<Func>(func), priority);       
103         }
104         
105         template<typename Func>
106         auto invoke(Func&& func, task_priority prioriy = task_priority::normal_priority) -> decltype(func()) // noexcept
107         {
108                 if(is_current())  // Avoids potential deadlock.
109                         return func();
110                 
111                 return begin_invoke(std::forward<Func>(func), prioriy).get();
112         }
113
114         void yield(task_priority minimum_priority)
115         {
116                 if(!is_current())
117                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.")  << source_info(name_));
118
119                 std::function<void ()> func;
120
121                 while (execution_queue_.try_pop(func, minimum_priority))
122                         func();
123         }
124
125         void set_capacity(function_queue_t::size_type capacity)
126         {
127                 execution_queue_.set_capacity(capacity);
128         }
129
130         function_queue_t::size_type capacity() const
131         {
132                 return execution_queue_.capacity();
133         }
134
135         bool is_full() const
136         {
137                 return execution_queue_.space_available() == 0;
138         }
139         
140         void clear()
141         {               
142                 std::function<void ()> func;
143                 while(execution_queue_.try_pop(func));
144         }
145                                 
146         void stop()
147         {
148                 invoke([this]
149                 {
150                         is_running_ = false;
151                 });
152         }
153
154         void wait()
155         {
156                 invoke([]{}, task_priority::lowest_priority);
157         }
158                 
159         function_queue_t::size_type size() const 
160         {
161                 return execution_queue_.size(); 
162         }
163                 
164         bool is_running() const
165         {
166                 return is_running_; 
167         }       
168
169         bool is_current() const
170         {
171                 return boost::this_thread::get_id() == thread_.get_id();
172         }
173                 
174 private:        
175
176         std::wstring print() const
177         {
178                 return L"executor[" + name_ + L"]";
179         }
180         
181         template<typename Func>
182         auto internal_begin_invoke(
183                 Func&& func,
184                 task_priority priority = task_priority::normal_priority) -> std::future<decltype(func())> // noexcept
185         {                                       
186                 typedef typename std::remove_reference<Func>::type      function_type;
187                 typedef decltype(func())                                                        result_type;
188                 typedef std::packaged_task<result_type()>                       task_type;
189                                                                 
190                 std::shared_ptr<task_type> task;
191
192                 // Use pointers since the boost thread library doesn't fully support move semantics.
193
194                 auto raw_func2 = new function_type(std::forward<Func>(func));
195                 try
196                 {
197                         task.reset(new task_type([raw_func2]() -> result_type
198                         {
199                                 std::unique_ptr<function_type> func2(raw_func2);
200                                 return (*func2)();
201                         }));
202                 }
203                 catch(...)
204                 {
205                         delete raw_func2;
206                         throw;
207                 }
208                                 
209                 auto future = task->get_future().share();
210                 auto function = [task]
211                 {
212                         try
213                         {
214                                 (*task)();
215                         }
216                         catch(std::future_error&){}
217                 };
218
219                 if (!execution_queue_.try_push(priority, function))
220                 {
221                         CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
222                         execution_queue_.push(priority, function);
223                 }
224
225                 return std::async(std::launch::deferred, [=]() mutable -> result_type
226                 {
227                         if (!is_ready(future) && is_current()) // Avoids potential deadlock.
228                         {
229                                 function();
230                         }
231
232                         return future.get();
233                 });
234         }
235
236         void run() // noexcept
237         {
238                 win32_exception::install_handler();             
239                 while(is_running_)
240                 {
241                         try
242                         {
243                                 std::function<void ()> func;
244                                 execution_queue_.pop(func);
245                                 func();
246                         }
247                         catch(...)
248                         {
249                                 CASPAR_LOG_CURRENT_EXCEPTION();
250                         }
251                 }
252         }       
253 };
254
255 }