]> git.sesse.net Git - casparcg/blob - common/future.h
Manually merged a2aa960bd1f12dfa78fdd739b1ffa699e9e532a9 from master
[casparcg] / common / future.h
1 #pragma once
2
3 #include "enum_class.h"
4
5 #include <boost/thread/future.hpp>
6 #include <boost/thread/thread.hpp>
7 #include <boost/shared_ptr.hpp>
8
9 #include <functional>
10
11 namespace caspar {
12         
13 struct launch_policy_def
14 {
15         enum type
16         {
17                 async = 1,
18                 deferred = 2
19         };
20 };
21 typedef caspar::enum_class<launch_policy_def> launch;
22
23 namespace detail {
24         
25 template<typename R>
26 struct future_object_helper
27 {       
28         template<typename T, typename F>
29         static void nonlocking_invoke(T& future_object, F& f)
30         {                               
31         try
32         {
33                         future_object.mark_finished_with_result_internal(f());
34         }
35         catch(...)
36         {
37                         future_object.mark_exceptional_finish_internal(boost::current_exception());
38         }
39         }
40
41         template<typename T, typename F>
42         static void locking_invoke(T& future_object, F& f)
43         {                               
44         try
45         {
46                         future_object.mark_finished_with_result(f());
47         }
48         catch(...)
49         {
50                         future_object.mark_exceptional_finish();
51         }
52         }
53 };
54
55 template<>
56 struct future_object_helper<void>
57 {       
58         template<typename T, typename F>
59         static void nonlocking_invoke(T& future_object, F& f)
60         {                               
61         try
62         {
63                         f();
64                         future_object.mark_finished_with_result_internal();
65         }
66         catch(...)
67         {
68                         future_object.mark_exceptional_finish_internal(boost::current_exception());
69         }
70         }
71
72         template<typename T, typename F>
73         static void locking_invoke(T& future_object, F& f)
74         {                               
75         try
76         {
77                         f();
78                         future_object.mark_finished_with_result();
79         }
80         catch(...)
81         {
82                         future_object.mark_exceptional_finish();
83         }
84         }
85 };
86
87 template<typename R, typename F>
88 struct deferred_future_object : public boost::detail::future_object<R>
89 {       
90         F f;
91         bool done;
92
93         template<typename F2>
94         deferred_future_object(F2&& f)
95                 : f(std::forward<F2>(f))
96                 , done(false)
97         {
98                 set_wait_callback(std::mem_fn(&detail::deferred_future_object<R, F>::operator()), this);
99         }
100
101         ~deferred_future_object()
102         {
103         }
104                 
105         void operator()()
106         {               
107                 boost::lock_guard<boost::mutex> lock2(mutex);
108
109                 if(done)
110                         return;
111
112                 future_object_helper<R>::nonlocking_invoke(*this, f);
113
114                 done = true;
115         }
116 };
117
118 template<typename R, typename F>
119 struct async_future_object : public boost::detail::future_object<R>
120 {       
121         F f;
122         boost::thread thread;
123
124         template<typename F2>
125         async_future_object(F2&& f)
126                 : f(std::forward<F2>(f))
127                 , thread([this]{run();})
128         {
129         }
130
131         ~async_future_object()
132         {
133                 thread.join();
134         }
135
136         void run()
137         {
138                 future_object_helper<R>::locking_invoke(*this, f);
139         }
140 };
141
142 }
143         
144 template<typename F>
145 auto async(launch policy, F&& f) -> boost::unique_future<decltype(f())>
146 {               
147         typedef decltype(f())                                                           result_type;    
148         typedef boost::detail::future_object<result_type>       future_object_type;
149
150         boost::shared_ptr<future_object_type> future_object;
151
152         // HACK: This solution is a hack to avoid modifying boost code.
153
154         if((policy & launch::async) != 0)
155                 future_object.reset(new detail::async_future_object<result_type, F>(std::forward<F>(f)), [](future_object_type* p){delete reinterpret_cast<detail::async_future_object<result_type, F>*>(p);});
156         else if((policy & launch::deferred) != 0)
157                 future_object.reset(new detail::deferred_future_object<result_type, F>(std::forward<F>(f)), [](future_object_type* p){delete reinterpret_cast<detail::deferred_future_object<result_type, F>*>(p);});
158         else
159                 throw std::invalid_argument("policy");
160         
161         boost::unique_future<result_type> future;
162
163         static_assert(sizeof(future) == sizeof(future_object), "");
164
165         reinterpret_cast<boost::shared_ptr<future_object_type>&>(future) = std::move(future_object); // Get around the "private" encapsulation.
166         return std::move(future);
167 }
168         
169 template<typename F>
170 auto async(F&& f) -> boost::unique_future<decltype(f())>
171 {       
172         return async(launch::async | launch::deferred, std::forward<F>(f));
173 }
174
175 template<typename T>
176 auto make_shared(boost::unique_future<T>&& f) -> boost::shared_future<T>
177 {       
178         return boost::shared_future<T>(std::move(f));
179 }
180
181 template<typename T>
182 auto flatten(boost::unique_future<T>&& f) -> boost::unique_future<decltype(f.get().get())>
183 {
184         auto shared_f = make_shared(std::move(f));
185         return async(launch::deferred, [=]() mutable
186         {
187                 return shared_f.get().get();
188         });
189 }
190
191 /**
192  * A utility that helps the producer side of a future when the task is not
193  * able to complete immediately but there are known retry points in the code.
194  */
195 template<class R>
196 class retry_task
197 {
198 public:
199         typedef boost::function<boost::optional<R> ()> func_type;
200         
201         retry_task() : done_(false) {}
202
203         /**
204          * Reset the state with a new task. If the previous task has not completed
205          * the old one will be discarded.
206          *
207          * @param func The function that tries to calculate future result. If the
208          *             optional return value is set the future is marked as ready.
209          */
210         void set_task(const func_type& func)
211         {
212                 boost::mutex::scoped_lock lock(mutex_);
213
214                 func_ = func;
215                 done_ = false;
216                 promise_ = boost::promise<R>();
217         }
218
219         /**
220          * Take ownership of the future for the current task. Cannot only be called
221          * once for each task.
222          *
223          * @return the future.
224          */
225         boost::unique_future<R> get_future()
226         {
227                 boost::mutex::scoped_lock lock(mutex_);
228
229                 return promise_.get_future();
230         }
231
232         /**
233          * Call this when it is guaranteed or probable that the task will be able
234          * to complete.
235          *
236          * @return true if the task completed (the future will have a result).
237          */
238         bool try_completion()
239         {
240                 boost::mutex::scoped_lock lock(mutex_);
241
242                 return try_completion_internal();
243         }
244
245         /**
246          * Call this when it is certain that the result should be ready, and if not
247          * it should be regarded as an unrecoverable error (retrying again would
248          * be useless), so the future will be marked as failed.
249          *
250          * @param exception The exception to mark the future with *if* the task
251          *                  completion fails.
252          */
253         void try_or_fail(const std::exception& exception)
254         {
255                 boost::mutex::scoped_lock lock(mutex_);
256
257                 if (!try_completion_internal())
258                 {
259                         try
260                         {
261                                 throw exception;
262                         }
263                         catch (...)
264                         {
265                                 CASPAR_LOG_CURRENT_EXCEPTION();
266                                 promise_.set_exception(boost::current_exception());
267                                 done_ = true;
268                         }
269                 }
270         }
271 private:
272         bool try_completion_internal()
273         {
274                 if (!func_)
275                         return false;
276
277                 if (done_)
278                         return true;
279
280                 boost::optional<R> result;
281
282                 try
283                 {
284                         result = func_();
285                 }
286                 catch (...)
287                 {
288                         CASPAR_LOG_CURRENT_EXCEPTION();
289                         promise_.set_exception(boost::current_exception());
290                         done_ = true;
291
292                         return true;
293                 }
294
295                 if (result)
296                 {
297                         promise_.set_value(*result);
298                         done_ = true;
299                 }
300
301                 return done_;
302         }
303 private:
304         boost::mutex mutex_;
305         func_type func_;
306         boost::promise<R> promise_;
307         bool done_;
308 };
309
310 /**
311  * Wrap a value in a future with an already known result.
312  * <p>
313  * Useful when the result of an operation is already known at the time of
314  * calling.
315  *
316  * @param value The r-value to wrap.
317  *
318  * @return The future with the result set.
319  */
320 template<class R>
321 boost::unique_future<R> wrap_as_future(R&& value)
322 {
323         boost::promise<R> p;
324
325         p.set_value(value);
326
327         return p.get_future();
328 }
329
330 }