]> git.sesse.net Git - casparcg/blob - common/reactive.h
e0e2d4aae1bd49b76c3b3b7bfa12716ec7afaaa7
[casparcg] / common / reactive.h
1 #pragma once\r
2 \r
3 #include "memory.h"\r
4 #include "lock.h"\r
5 \r
6 #include <tbb/spin_rw_mutex.h>\r
7 #include <tbb/cache_aligned_allocator.h>\r
8 \r
9 #include <algorithm>\r
10 #include <functional>\r
11 #include <memory>\r
12 #include <vector>\r
13 \r
14 namespace caspar { namespace reactive {\r
15         \r
16 namespace detail {\r
17 \r
18 // function_traits which works with MSVC2010 lambdas.\r
19         \r
20 template<typename FPtr>\r
21 struct function_traits_impl;\r
22 \r
23 template<typename R, typename A1>\r
24 struct function_traits_impl<R (*)(A1)>\r
25 {\r
26         typedef A1 arg1_type;\r
27 };\r
28 \r
29 template<typename R, typename C, typename A1>\r
30 struct function_traits_impl<R (C::*)(A1)>\r
31 {\r
32         typedef A1 arg1_type;\r
33 };\r
34 \r
35 template<typename R, typename C, typename A1>\r
36 struct function_traits_impl<R (C::*)(A1) const>\r
37 {\r
38         typedef A1 arg1_type;\r
39 };\r
40 \r
41 template<typename T>\r
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);\r
43 \r
44 template<typename F>\r
45 struct function_traits\r
46 {\r
47         typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;\r
48 };\r
49 \r
50 }\r
51 \r
52 template<typename T>\r
53 class observer\r
54 {       \r
55         observer(const observer&);\r
56         observer& operator=(const observer&);\r
57 public:\r
58 \r
59         // Static Members\r
60 \r
61         typedef T value_type;\r
62         \r
63         // Constructors\r
64 \r
65         observer()\r
66         {\r
67         }\r
68 \r
69         virtual ~observer()\r
70         {\r
71         }\r
72 \r
73         // Methods\r
74 \r
75         virtual void on_next(const T&) = 0;\r
76 \r
77         // Properties\r
78 };\r
79 \r
80 template<typename T>\r
81 class observable\r
82 {\r
83         observable(const observable&);\r
84         observable& operator=(const observable&);\r
85 public:\r
86 \r
87         // Static Members\r
88 \r
89         typedef T                                               value_type;\r
90         typedef observer<T>                             observer;\r
91         typedef std::weak_ptr<observer> observer_ptr;\r
92         \r
93         // Constructors\r
94 \r
95         observable()\r
96         {\r
97         }\r
98 \r
99         virtual ~observable()\r
100         {\r
101         }\r
102         \r
103         // Methods\r
104 \r
105         virtual void subscribe(const observer_ptr&) = 0;\r
106         virtual void unsubscribe(const observer_ptr&) = 0;\r
107         \r
108         // Properties\r
109 };\r
110 \r
111 template<typename I, typename O = I>\r
112 class subject : public observer<I>, public observable<O>\r
113 {\r
114 public:\r
115         \r
116         // Static Members\r
117 \r
118         typedef typename observable<O>::observer                observer;\r
119         typedef typename observable<O>::observer_ptr    observer_ptr;\r
120         \r
121         // Constructors\r
122 \r
123         virtual ~subject()\r
124         {\r
125         }\r
126         \r
127         // Methods\r
128 \r
129         // Properties\r
130 };\r
131 \r
132 template<typename T, typename C>\r
133 class observer_function : public observer<T>\r
134 {\r
135 public:\r
136         \r
137         // Static Members\r
138         \r
139         // Constructors\r
140 \r
141         observer_function()\r
142         {\r
143         }\r
144 \r
145         observer_function(C func)\r
146                 : func_(std::move(func))\r
147         {\r
148         }\r
149 \r
150         observer_function(const observer_function& other)\r
151                 : func_(other.func_)\r
152         {\r
153         }\r
154 \r
155         observer_function(observer_function&& other)\r
156                 : func_(std::move(other.func_))\r
157         {\r
158         }\r
159 \r
160         observer_function& operator=(observer_function other)\r
161         {\r
162                 other.swap(*this);\r
163         }\r
164                 \r
165         // Methods\r
166 \r
167         void swap(observer_function& other)\r
168         {\r
169                 std::swap(func_, other.func_);\r
170                 std::swap(filter_, other.filter_);\r
171         }\r
172                 \r
173         void on_next(const T& e) override\r
174         {\r
175                 func_(e);\r
176         }\r
177 \r
178         // Properties\r
179 private:\r
180         C func_;\r
181 };\r
182 \r
183 template<typename I, typename O = I>\r
184 class basic_subject_impl sealed : public subject<I, O>\r
185 {       \r
186     template <typename, typename> friend class basic_subject_impl;\r
187 \r
188         basic_subject_impl(const basic_subject_impl&);\r
189         basic_subject_impl& operator=(const basic_subject_impl&);\r
190 public: \r
191         // Static Members\r
192 \r
193         typedef typename subject<I, O>::observer                observer;\r
194         typedef typename subject<I, O>::observer_ptr    observer_ptr;\r
195 \r
196         // Constructors\r
197 \r
198         basic_subject_impl()\r
199         {\r
200         }\r
201                                         \r
202         basic_subject_impl(basic_subject_impl<I, O>&& other)\r
203                 : observers_(std::move(other.observers_))\r
204         {\r
205         }\r
206         \r
207         basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)\r
208         {\r
209                 observers_ = std::move(observers_);\r
210                 return *this;\r
211         }\r
212 \r
213         // Methods\r
214 \r
215         void clear()\r
216         {\r
217                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
218 \r
219                 observers_.clear();\r
220         }\r
221         \r
222         void subscribe(const observer_ptr& o) override\r
223         {                               \r
224                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
225 \r
226                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
227                 if (it == std::end(observers_) || comp_(o, *it))\r
228                 {\r
229                         lock.upgrade_to_writer();\r
230                         observers_.insert(it, o);\r
231                 }               \r
232         }\r
233 \r
234         void unsubscribe(const observer_ptr& o) override\r
235         {\r
236                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
237                 \r
238                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
239                 if(it != std::end(observers_) && !comp_(o, *it))\r
240                 {\r
241                         lock.upgrade_to_writer();\r
242                         observers_.erase(it);\r
243                 }               \r
244         }\r
245         \r
246         void on_next(const I& e) override\r
247         {                               \r
248                 std::vector<spl::shared_ptr<observer>> observers;\r
249 \r
250                 {\r
251                         tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
252                 \r
253                         auto expired = std::end(observers_);\r
254 \r
255                         for(auto it = std::begin(observers_); it != std::end(observers_); ++it)\r
256                         {\r
257                                 auto o = it->lock();\r
258                                 if(o)\r
259                                         observers.push_back(spl::make_shared_ptr(std::move(o)));\r
260                                 else\r
261                                         expired = it;\r
262                         }\r
263 \r
264                         if(expired != std::end(observers_))\r
265                         {               \r
266                                 lock.upgrade_to_writer();\r
267                                 observers_.erase(expired);\r
268                         }       \r
269                 }\r
270                 \r
271                 for(auto it = std::begin(observers); it != std::end(observers); ++it)\r
272                         (*it)->on_next(e);\r
273         }\r
274 \r
275         // Properties\r
276 \r
277 private:\r
278         typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>>   allocator;\r
279 \r
280         std::owner_less<std::weak_ptr<observer>>                comp_;\r
281         std::vector<std::weak_ptr<observer>, allocator> observers_;\r
282         mutable tbb::spin_rw_mutex                                              mutex_;\r
283 };\r
284 \r
285 template<typename I, typename O = I>\r
286 class basic_subject sealed : public subject<I, O>\r
287 {       \r
288     template <typename, typename> friend class basic_subject;\r
289 \r
290         basic_subject(const basic_subject&);\r
291         basic_subject& operator=(const basic_subject&);\r
292 \r
293         typedef basic_subject_impl<I, O> impl;\r
294 public: \r
295 \r
296         // Static Members\r
297 \r
298         typedef typename subject<I, O>::observer                observer;\r
299         typedef typename subject<I, O>::observer_ptr    observer_ptr;\r
300 \r
301         // Constructors\r
302 \r
303         basic_subject()\r
304                 : impl_(std::make_shared<impl>())\r
305 \r
306         {\r
307         }\r
308                 \r
309         basic_subject(subject&& other)\r
310                 : impl_(std::move(other.impl_))\r
311         {\r
312         }\r
313         \r
314         basic_subject& operator=(basic_subject&& other)\r
315         {\r
316                 other.swap(*this);\r
317         }\r
318 \r
319         // Methods\r
320 \r
321         void swap(basic_subject& other)\r
322         {\r
323                 impl_.swap(other.impl_);\r
324         }\r
325         \r
326         void subscribe(const observer_ptr& o) override\r
327         {                               \r
328                 impl_->subscribe(o);\r
329         }\r
330 \r
331         void unsubscribe(const observer_ptr& o) override\r
332         {\r
333                 impl_->unsubscribe(o);\r
334         }\r
335                                 \r
336         void on_next(const I& e) override\r
337         {                               \r
338                 impl_->on_next(e);\r
339         }\r
340 \r
341         operator std::weak_ptr<observer>()\r
342         {\r
343                 return impl_;\r
344         }\r
345 \r
346         // Properties\r
347 \r
348 private:\r
349         std::shared_ptr<impl> impl_;\r
350 };\r
351 \r
352 template<typename F>\r
353 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> \r
354 make_observer(F func)\r
355 {\r
356         return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));\r
357 }\r
358 \r
359 template<typename T>\r
360 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)\r
361 {\r
362         s.on_next(val);\r
363         return s;\r
364 }\r
365 \r
366 }}