]> git.sesse.net Git - casparcg/blob - common/reactive.h
- Removed need of non-deterministic sleeps during server shutdown.
[casparcg] / common / reactive.h
1 #pragma once
2
3 #include "memory.h"
4 #include "lock.h"
5
6 #include <tbb/spin_rw_mutex.h>
7 #include <tbb/cache_aligned_allocator.h>
8
9 #include <algorithm>
10 #include <functional>
11 #include <memory>
12 #include <vector>
13
14 namespace caspar { namespace reactive {
15         
16 namespace detail {
17
18 // function_traits which works with MSVC2010 lambdas.
19         
20 template<typename FPtr>
21 struct function_traits_impl;
22
23 template<typename R, typename A1>
24 struct function_traits_impl<R (*)(A1)>
25 {
26         typedef A1 arg1_type;
27 };
28
29 template<typename R, typename C, typename A1>
30 struct function_traits_impl<R (C::*)(A1)>
31 {
32         typedef A1 arg1_type;
33 };
34
35 template<typename R, typename C, typename A1>
36 struct function_traits_impl<R (C::*)(A1) const>
37 {
38         typedef A1 arg1_type;
39 };
40
41 template<typename T>
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
43
44 template<typename F>
45 struct function_traits
46 {
47         typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
48 };
49
50 }
51
52 template<typename T>
53 class observer
54 {       
55         observer(const observer&);
56         observer& operator=(const observer&);
57 public:
58
59         // Static Members
60
61         typedef T value_type;
62         
63         // Constructors
64
65         observer()
66         {
67         }
68
69         virtual ~observer()
70         {
71         }
72
73         // Methods
74
75         virtual void on_next(const T&) = 0;
76
77         // Properties
78 };
79
80 template<typename T>
81 class observable
82 {
83         observable(const observable&);
84         observable& operator=(const observable&);
85 public:
86
87         // Static Members
88
89         typedef T                                                                               value_type;
90         typedef reactive::observer<T>                                   observer;
91         typedef std::weak_ptr<reactive::observer<T>>    observer_ptr;
92         
93         // Constructors
94
95         observable()
96         {
97         }
98
99         virtual ~observable()
100         {
101         }
102         
103         // Methods
104
105         virtual void subscribe(const observer_ptr&) = 0;
106         virtual void unsubscribe(const observer_ptr&) = 0;
107         
108         // Properties
109 };
110
111 template<typename I, typename O = I>
112 class subject : public observer<I>, public observable<O>
113 {
114 public:
115         
116         // Static Members
117
118         typedef typename observable<O>::observer                observer;
119         typedef typename observable<O>::observer_ptr    observer_ptr;
120         
121         // Constructors
122
123         virtual ~subject()
124         {
125         }
126         
127         // Methods
128
129         // Properties
130 };
131
132 template<typename T, typename C>
133 class observer_function : public observer<T>
134 {
135 public:
136         
137         // Static Members
138         
139         // Constructors
140
141         observer_function()
142         {
143         }
144
145         observer_function(C func)
146                 : func_(std::move(func))
147         {
148         }
149
150         observer_function(const observer_function& other)
151                 : func_(other.func_)
152         {
153         }
154
155         observer_function(observer_function&& other)
156                 : func_(std::move(other.func_))
157         {
158         }
159
160         observer_function& operator=(observer_function other)
161         {
162                 other.swap(*this);
163         }
164                 
165         // Methods
166
167         void swap(observer_function& other)
168         {
169                 std::swap(func_, other.func_);
170         }
171                 
172         void on_next(const T& e) override
173         {
174                 func_(e);
175         }
176
177         // Properties
178 private:
179         C func_;
180 };
181
182 template<typename I, typename O = I>
183 class basic_subject_impl final : public subject<I, O>
184 {       
185     template <typename, typename> friend class basic_subject_impl;
186
187         basic_subject_impl(const basic_subject_impl&);
188         basic_subject_impl& operator=(const basic_subject_impl&);
189 public: 
190         // Static Members
191
192         typedef typename subject<I, O>::observer                observer;
193         typedef typename subject<I, O>::observer_ptr    observer_ptr;
194
195         // Constructors
196
197         basic_subject_impl()
198         {
199         }
200                                         
201         basic_subject_impl(basic_subject_impl<I, O>&& other)
202                 : observers_(std::move(other.observers_))
203         {
204         }
205         
206         basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
207         {
208                 observers_ = std::move(observers_);
209                 return *this;
210         }
211
212         // Methods
213
214         void clear()
215         {
216                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
217
218                 observers_.clear();
219         }
220         
221         void subscribe(const observer_ptr& o) override
222         {                               
223                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
224
225                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
226                 if (it == std::end(observers_) || comp_(o, *it))
227                 {
228                         lock.upgrade_to_writer();
229                         observers_.insert(it, o);
230                 }               
231         }
232
233         void unsubscribe(const observer_ptr& o) override
234         {
235                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
236                 
237                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
238                 if(it != std::end(observers_) && !comp_(o, *it))
239                 {
240                         lock.upgrade_to_writer();
241                         observers_.erase(it);
242                 }               
243         }
244         
245         void on_next(const I& e) override
246         {                               
247                 std::vector<spl::shared_ptr<observer>> observers;
248
249                 {
250                         tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
251                 
252                         auto expired = std::end(observers_);
253
254                         for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
255                         {
256                                 auto o = it->lock();
257                                 if(o)
258                                         observers.push_back(spl::make_shared_ptr(std::move(o)));
259                                 else
260                                         expired = it;
261                         }
262
263                         if(expired != std::end(observers_))
264                         {               
265                                 lock.upgrade_to_writer();
266                                 observers_.erase(expired);
267                         }       
268                 }
269                 
270                 for(auto it = std::begin(observers); it != std::end(observers); ++it)
271                         (*it)->on_next(e);
272         }
273
274         // Properties
275
276 private:
277         typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>>   allocator;
278
279         std::owner_less<std::weak_ptr<observer>>                comp_;
280         std::vector<std::weak_ptr<observer>, allocator> observers_;
281         mutable tbb::spin_rw_mutex                                              mutex_;
282 };
283
284 template<typename I, typename O = I>
285 class basic_subject : public subject<I, O>
286 {       
287     template <typename, typename> friend class basic_subject;
288
289         basic_subject(const basic_subject&);
290         basic_subject& operator=(const basic_subject&);
291
292         typedef basic_subject_impl<I, O> impl;
293 public: 
294
295         // Static Members
296
297         typedef typename subject<I, O>::observer                observer;
298         typedef typename subject<I, O>::observer_ptr    observer_ptr;
299
300         // Constructors
301
302         basic_subject()
303                 : impl_(std::make_shared<impl>())
304
305         {
306         }
307                 
308         basic_subject(basic_subject&& other)
309                 : impl_(std::move(other.impl_))
310         {
311         }
312         
313         basic_subject& operator=(basic_subject&& other)
314         {
315                 other.swap(*this);
316         }
317
318         // Methods
319
320         void swap(basic_subject& other)
321         {
322                 impl_.swap(other.impl_);
323         }
324         
325         void subscribe(const observer_ptr& o) override
326         {                               
327                 impl_->subscribe(o);
328         }
329
330         void unsubscribe(const observer_ptr& o) override
331         {
332                 impl_->unsubscribe(o);
333         }
334                                 
335         void on_next(const I& e) override
336         {                               
337                 impl_->on_next(e);
338         }
339
340         operator std::weak_ptr<observer>()
341         {
342                 return impl_;
343         }
344
345         // Properties
346
347 private:
348         std::shared_ptr<impl> impl_;
349 };
350
351 template<typename F>
352 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> 
353 make_observer(F func)
354 {
355         return spl::make_shared<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
356 }
357
358 template<typename T>
359 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)
360 {
361         s.on_next(val);
362         return s;
363 }
364
365 }}