]> git.sesse.net Git - casparcg/blob - common/reactive.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / reactive.h
1 #pragma once\r
2 \r
3 #include "spl/memory.h"\r
4 #include "concurrency/lock.h"\r
5 \r
6 #include <tbb/spin_rw_mutex.h>\r
7 #include <tbb/cache_aligned_allocator.h>\r
8 \r
9 #include <algorithm>\r
10 #include <functional>\r
11 #include <memory>\r
12 #include <vector>\r
13 \r
14 namespace caspar { namespace reactive {\r
15         \r
16 namespace detail {\r
17 \r
18 // function_traits which works with MSVC2010 lambdas.\r
19         \r
20 template<typename FPtr>\r
21 struct function_traits_impl;\r
22 \r
23 template<typename R, typename A1>\r
24 struct function_traits_impl<R (*)(A1)>\r
25 {\r
26         typedef A1 arg1_type;\r
27 };\r
28 \r
29 template<typename R, typename C, typename A1>\r
30 struct function_traits_impl<R (C::*)(A1)>\r
31 {\r
32         typedef A1 arg1_type;\r
33 };\r
34 \r
35 template<typename R, typename C, typename A1>\r
36 struct function_traits_impl<R (C::*)(A1) const>\r
37 {\r
38         typedef A1 arg1_type;\r
39 };\r
40 \r
41 template<typename T>\r
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);\r
43 \r
44 template<typename F>\r
45 struct function_traits\r
46 {\r
47         typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;\r
48 };\r
49 \r
50 }\r
51 \r
52 template<typename T>\r
53 class observer\r
54 {       \r
55         observer(const observer&);\r
56         observer& operator=(const observer&);\r
57 public:\r
58         typedef T value_type;\r
59         \r
60         observer()\r
61         {\r
62         }\r
63 \r
64         virtual ~observer()\r
65         {\r
66         }\r
67 \r
68         virtual void on_next(const T&) = 0;\r
69 };\r
70 \r
71 template<typename T>\r
72 class observable\r
73 {\r
74         observable(const observable&);\r
75         observable& operator=(const observable&);\r
76 public:\r
77         typedef T                                               value_type;\r
78         typedef observer<T>                             observer;\r
79         typedef std::weak_ptr<observer> observer_ptr;\r
80 \r
81         observable()\r
82         {\r
83         }\r
84 \r
85         virtual ~observable()\r
86         {\r
87         }\r
88 \r
89         virtual void subscribe(const observer_ptr&) = 0;\r
90         virtual void unsubscribe(const observer_ptr&) = 0;\r
91 };\r
92 \r
93 template<typename I, typename O = I>\r
94 struct subject : public observer<I>, public observable<O>\r
95 {\r
96         typedef typename observable<O>::observer                observer;\r
97         typedef typename observable<O>::observer_ptr    observer_ptr;\r
98 \r
99         virtual ~subject()\r
100         {\r
101         }\r
102 };\r
103 \r
104 template<typename T, typename C>\r
105 class observer_function : public observer<T>\r
106 {\r
107 public:\r
108         observer_function()\r
109         {\r
110         }\r
111 \r
112         observer_function(C func)\r
113                 : func_(std::move(func))\r
114         {\r
115         }\r
116 \r
117         observer_function(const observer_function& other)\r
118                 : func_(other.func_)\r
119         {\r
120         }\r
121 \r
122         observer_function(observer_function&& other)\r
123                 : func_(std::move(other.func_))\r
124         {\r
125         }\r
126 \r
127         observer_function& operator=(observer_function other)\r
128         {\r
129                 other.swap(*this);\r
130         }\r
131 \r
132         void swap(observer_function& other)\r
133         {\r
134                 std::swap(func_, other.func_);\r
135                 std::swap(filter_, other.filter_);\r
136         }\r
137                 \r
138         virtual void on_next(const T& e) override\r
139         {\r
140                 func_(e);\r
141         }\r
142 private:\r
143         C func_;\r
144 };\r
145 \r
146 template<typename I, typename O = I>\r
147 class basic_subject_impl sealed : public subject<I, O>\r
148 {       \r
149     template <typename, typename> friend class basic_subject_impl;\r
150 \r
151         basic_subject_impl(const basic_subject_impl&);\r
152         basic_subject_impl& operator=(const basic_subject_impl&);\r
153 public: \r
154         typedef typename subject<I, O>::observer                observer;\r
155         typedef typename subject<I, O>::observer_ptr    observer_ptr;\r
156 \r
157         basic_subject_impl()\r
158         {\r
159         }\r
160                                         \r
161         basic_subject_impl(basic_subject_impl<I, O>&& other)\r
162                 : observers_(std::move(other.observers_))\r
163         {\r
164         }\r
165         \r
166         basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)\r
167         {\r
168                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
169                 observers_ = std::move(observers_);\r
170                 return *this;\r
171         }\r
172 \r
173         void clear()\r
174         {\r
175                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
176 \r
177                 observers_.clear();\r
178         }\r
179         \r
180         void subscribe(const observer_ptr& o) override\r
181         {                               \r
182                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
183 \r
184                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
185                 if (it == std::end(observers_) || comp_(o, *it))\r
186                 {\r
187                         lock.upgrade_to_writer();\r
188                         observers_.insert(it, o);\r
189                 }               \r
190         }\r
191 \r
192         void unsubscribe(const observer_ptr& o) override\r
193         {\r
194                 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
195                 \r
196                 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
197                 if(it != std::end(observers_) && !comp_(o, *it))\r
198                 {\r
199                         lock.upgrade_to_writer();\r
200                         observers_.erase(it);\r
201                 }               \r
202         }\r
203         \r
204         void on_next(const I& e) override\r
205         {                               \r
206                 std::vector<spl::shared_ptr<observer>> observers;\r
207 \r
208                 {\r
209                         tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
210                 \r
211                         auto expired = std::end(observers_);\r
212 \r
213                         for(auto it = std::begin(observers_); it != std::end(observers_); ++it)\r
214                         {\r
215                                 auto o = it->lock();\r
216                                 if(o)\r
217                                         observers.push_back(spl::make_shared_ptr(std::move(o)));\r
218                                 else\r
219                                         expired = it;\r
220                         }\r
221 \r
222                         if(expired != std::end(observers_))\r
223                         {               \r
224                                 lock.upgrade_to_writer();\r
225                                 observers_.erase(expired);\r
226                         }       \r
227                 }\r
228                 \r
229                 for(auto it = std::begin(observers); it != std::end(observers); ++it)\r
230                         (*it)->on_next(e);\r
231         }\r
232 private:\r
233         typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>>   allocator;\r
234 \r
235         std::owner_less<std::weak_ptr<observer>>                comp_;\r
236         std::vector<std::weak_ptr<observer>, allocator> observers_;\r
237         mutable tbb::spin_rw_mutex                                              mutex_;\r
238 };\r
239 \r
240 template<typename I, typename O = I>\r
241 class basic_subject : public subject<I, O>\r
242 {       \r
243     template <typename, typename> friend class basic_subject;\r
244 \r
245         basic_subject(const basic_subject&);\r
246         basic_subject& operator=(const basic_subject&);\r
247 \r
248         typedef basic_subject_impl<I, O> impl;\r
249 public: \r
250         typedef typename subject<I, O>::observer                observer;\r
251         typedef typename subject<I, O>::observer_ptr    observer_ptr;\r
252 \r
253         basic_subject()\r
254                 : impl_(std::make_shared<impl>())\r
255 \r
256         {\r
257         }\r
258                 \r
259         basic_subject(subject&& other)\r
260                 : impl_(std::move(other.impl_))\r
261         {\r
262         }\r
263 \r
264         virtual ~basic_subject()\r
265         {\r
266         }\r
267 \r
268         basic_subject& operator=(basic_subject&& other)\r
269         {\r
270                 other.swap(*this);\r
271         }\r
272 \r
273         void swap(basic_subject& other)\r
274         {\r
275                 impl_.swap(other.impl_);\r
276         }\r
277         \r
278         virtual void subscribe(const observer_ptr& o) override\r
279         {                               \r
280                 impl_->subscribe(o);\r
281         }\r
282 \r
283         virtual void unsubscribe(const observer_ptr& o) override\r
284         {\r
285                 impl_->unsubscribe(o);\r
286         }\r
287                                 \r
288         virtual void on_next(const I& e) override\r
289         {                               \r
290                 impl_->on_next(e);\r
291         }\r
292 \r
293         operator std::weak_ptr<observer>()\r
294         {\r
295                 return impl_;\r
296         }\r
297 private:\r
298         std::shared_ptr<impl> impl_;\r
299 };\r
300 \r
301 template<typename F>\r
302 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> \r
303 make_observer(F func)\r
304 {\r
305         return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));\r
306 }\r
307 \r
308 template<typename T>\r
309 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)\r
310 {\r
311         s.on_next(val);\r
312         return s;\r
313 }\r
314 \r
315 }}