]> git.sesse.net Git - casparcg/blob - common/reactive.h
set svn:eol-style native on .h and .cpp files
[casparcg] / common / reactive.h
1 #pragma once
2
3 #include "memory.h"
4 #include "lock.h"
5
6 #include <tbb/spin_rw_mutex.h>
7 #include <tbb/cache_aligned_allocator.h>
8
9 #include <algorithm>
10 #include <functional>
11 #include <memory>
12 #include <vector>
13
14 namespace caspar { namespace reactive {
15         
16 namespace detail {
17
18 // function_traits which works with MSVC2010 lambdas.
19         
20 template<typename FPtr>
21 struct function_traits_impl;
22
23 template<typename R, typename A1>
24 struct function_traits_impl<R (*)(A1)>
25 {
26         typedef A1 arg1_type;
27 };
28
29 template<typename R, typename C, typename A1>
30 struct function_traits_impl<R (C::*)(A1)>
31 {
32         typedef A1 arg1_type;
33 };
34
35 template<typename R, typename C, typename A1>
36 struct function_traits_impl<R (C::*)(A1) const>
37 {
38         typedef A1 arg1_type;
39 };
40
41 template<typename T>
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
43
44 template<typename F>
45 struct function_traits
46 {
47         typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
48 };
49
50 }
51
52 template<typename T>
53 class observer
54 {       
55         observer(const observer&);
56         observer& operator=(const observer&);
57 public:
58
59         // Static Members
60
61         typedef T value_type;
62         
63         // Constructors
64
65         observer()
66         {
67         }
68
69         virtual ~observer()
70         {
71         }
72
73         // Methods
74
75         virtual void on_next(const T&) = 0;
76
77         // Properties
78 };
79
80 template<typename T>
81 class observable
82 {
83         observable(const observable&);
84         observable& operator=(const observable&);
85 public:
86
87         // Static Members
88
89         typedef T                                               value_type;
90         typedef observer<T>                             observer;
91         typedef std::weak_ptr<observer> observer_ptr;
92         
93         // Constructors
94
95         observable()
96         {
97         }
98
99         virtual ~observable()
100         {
101         }
102         
103         // Methods
104
105         virtual void subscribe(const observer_ptr&) = 0;
106         virtual void unsubscribe(const observer_ptr&) = 0;
107         
108         // Properties
109 };
110
111 template<typename I, typename O = I>
112 class subject : public observer<I>, public observable<O>
113 {
114 public:
115         
116         // Static Members
117
118         typedef typename observable<O>::observer                observer;
119         typedef typename observable<O>::observer_ptr    observer_ptr;
120         
121         // Constructors
122
123         virtual ~subject()
124         {
125         }
126         
127         // Methods
128
129         // Properties
130 };
131
132 template<typename T, typename C>
133 class observer_function : public observer<T>
134 {
135 public:
136         
137         // Static Members
138         
139         // Constructors
140
141         observer_function()
142         {
143         }
144
145         observer_function(C func)
146                 : func_(std::move(func))
147         {
148         }
149
150         observer_function(const observer_function& other)
151                 : func_(other.func_)
152         {
153         }
154
155         observer_function(observer_function&& other)
156                 : func_(std::move(other.func_))
157         {
158         }
159
160         observer_function& operator=(observer_function other)
161         {
162                 other.swap(*this);
163         }
164                 
165         // Methods
166
167         void swap(observer_function& other)
168         {
169                 std::swap(func_, other.func_);
170                 std::swap(filter_, other.filter_);
171         }
172                 
173         void on_next(const T& e) override
174         {
175                 func_(e);
176         }
177
178         // Properties
179 private:
180         C func_;
181 };
182
183 template<typename I, typename O = I>
184 class basic_subject_impl sealed : public subject<I, O>
185 {       
186     template <typename, typename> friend class basic_subject_impl;
187
188         basic_subject_impl(const basic_subject_impl&);
189         basic_subject_impl& operator=(const basic_subject_impl&);
190 public: 
191         // Static Members
192
193         typedef typename subject<I, O>::observer                observer;
194         typedef typename subject<I, O>::observer_ptr    observer_ptr;
195
196         // Constructors
197
198         basic_subject_impl()
199         {
200         }
201                                         
202         basic_subject_impl(basic_subject_impl<I, O>&& other)
203                 : observers_(std::move(other.observers_))
204         {
205         }
206         
207         basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
208         {
209                 observers_ = std::move(observers_);
210                 return *this;
211         }
212
213         // Methods
214
215         void clear()
216         {
217                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
218
219                 observers_.clear();
220         }
221         
222         void subscribe(const observer_ptr& o) override
223         {                               
224                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
225
226                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
227                 if (it == std::end(observers_) || comp_(o, *it))
228                 {
229                         lock.upgrade_to_writer();
230                         observers_.insert(it, o);
231                 }               
232         }
233
234         void unsubscribe(const observer_ptr& o) override
235         {
236                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
237                 
238                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
239                 if(it != std::end(observers_) && !comp_(o, *it))
240                 {
241                         lock.upgrade_to_writer();
242                         observers_.erase(it);
243                 }               
244         }
245         
246         void on_next(const I& e) override
247         {                               
248                 std::vector<spl::shared_ptr<observer>> observers;
249
250                 {
251                         tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
252                 
253                         auto expired = std::end(observers_);
254
255                         for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
256                         {
257                                 auto o = it->lock();
258                                 if(o)
259                                         observers.push_back(spl::make_shared_ptr(std::move(o)));
260                                 else
261                                         expired = it;
262                         }
263
264                         if(expired != std::end(observers_))
265                         {               
266                                 lock.upgrade_to_writer();
267                                 observers_.erase(expired);
268                         }       
269                 }
270                 
271                 for(auto it = std::begin(observers); it != std::end(observers); ++it)
272                         (*it)->on_next(e);
273         }
274
275         // Properties
276
277 private:
278         typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>>   allocator;
279
280         std::owner_less<std::weak_ptr<observer>>                comp_;
281         std::vector<std::weak_ptr<observer>, allocator> observers_;
282         mutable tbb::spin_rw_mutex                                              mutex_;
283 };
284
285 template<typename I, typename O = I>
286 class basic_subject sealed : public subject<I, O>
287 {       
288     template <typename, typename> friend class basic_subject;
289
290         basic_subject(const basic_subject&);
291         basic_subject& operator=(const basic_subject&);
292
293         typedef basic_subject_impl<I, O> impl;
294 public: 
295
296         // Static Members
297
298         typedef typename subject<I, O>::observer                observer;
299         typedef typename subject<I, O>::observer_ptr    observer_ptr;
300
301         // Constructors
302
303         basic_subject()
304                 : impl_(std::make_shared<impl>())
305
306         {
307         }
308                 
309         basic_subject(subject&& other)
310                 : impl_(std::move(other.impl_))
311         {
312         }
313         
314         basic_subject& operator=(basic_subject&& other)
315         {
316                 other.swap(*this);
317         }
318
319         // Methods
320
321         void swap(basic_subject& other)
322         {
323                 impl_.swap(other.impl_);
324         }
325         
326         void subscribe(const observer_ptr& o) override
327         {                               
328                 impl_->subscribe(o);
329         }
330
331         void unsubscribe(const observer_ptr& o) override
332         {
333                 impl_->unsubscribe(o);
334         }
335                                 
336         void on_next(const I& e) override
337         {                               
338                 impl_->on_next(e);
339         }
340
341         operator std::weak_ptr<observer>()
342         {
343                 return impl_;
344         }
345
346         // Properties
347
348 private:
349         std::shared_ptr<impl> impl_;
350 };
351
352 template<typename F>
353 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> 
354 make_observer(F func)
355 {
356         return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
357 }
358
359 template<typename T>
360 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)
361 {
362         s.on_next(val);
363         return s;
364 }
365
366 }}