3 #include "spl/memory.h"
\r
4 #include "concurrency/lock.h"
\r
6 #include <tbb/spin_rw_mutex.h>
\r
7 #include <tbb/cache_aligned_allocator.h>
\r
10 #include <functional>
\r
14 namespace caspar { namespace reactive {
\r
18 // function_traits which works with MSVC2010 lambdas.
\r
20 template<typename FPtr>
\r
21 struct function_traits_impl;
\r
23 template<typename R, typename A1>
\r
24 struct function_traits_impl<R (*)(A1)>
\r
26 typedef A1 arg1_type;
\r
29 template<typename R, typename C, typename A1>
\r
30 struct function_traits_impl<R (C::*)(A1)>
\r
32 typedef A1 arg1_type;
\r
35 template<typename R, typename C, typename A1>
\r
36 struct function_traits_impl<R (C::*)(A1) const>
\r
38 typedef A1 arg1_type;
\r
41 template<typename T>
\r
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
\r
44 template<typename F>
\r
45 struct function_traits
\r
47 typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
\r
52 template<typename T>
\r
55 observer(const observer&);
\r
56 observer& operator=(const observer&);
\r
58 typedef T value_type;
\r
68 virtual void on_next(const T&) = 0;
\r
71 template<typename T>
\r
74 observable(const observable&);
\r
75 observable& operator=(const observable&);
\r
77 typedef T value_type;
\r
78 typedef observer<T> observer;
\r
79 typedef std::weak_ptr<observer> observer_ptr;
\r
85 virtual ~observable()
\r
89 virtual void subscribe(const observer_ptr&) = 0;
\r
90 virtual void unsubscribe(const observer_ptr&) = 0;
\r
93 template<typename I, typename O = I>
\r
94 struct subject : public observer<I>, public observable<O>
\r
96 typedef typename observable<O>::observer observer;
\r
97 typedef typename observable<O>::observer_ptr observer_ptr;
\r
104 template<typename T, typename C>
\r
105 class observer_function : public observer<T>
\r
108 observer_function()
\r
112 observer_function(C func)
\r
113 : func_(std::move(func))
\r
117 observer_function(const observer_function& other)
\r
118 : func_(other.func_)
\r
122 observer_function(observer_function&& other)
\r
123 : func_(std::move(other.func_))
\r
127 observer_function& operator=(observer_function other)
\r
132 void swap(observer_function& other)
\r
134 std::swap(func_, other.func_);
\r
135 std::swap(filter_, other.filter_);
\r
138 virtual void on_next(const T& e) override
\r
146 template<typename I, typename O = I>
\r
147 class basic_subject_impl sealed : public subject<I, O>
\r
149 template <typename, typename> friend class basic_subject_impl;
\r
151 basic_subject_impl(const basic_subject_impl&);
\r
152 basic_subject_impl& operator=(const basic_subject_impl&);
\r
154 typedef typename subject<I, O>::observer observer;
\r
155 typedef typename subject<I, O>::observer_ptr observer_ptr;
\r
157 basic_subject_impl()
\r
161 basic_subject_impl(basic_subject_impl<I, O>&& other)
\r
162 : observers_(std::move(other.observers_))
\r
166 basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
\r
168 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
\r
169 observers_ = std::move(observers_);
\r
175 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
\r
177 observers_.clear();
\r
180 void subscribe(const observer_ptr& o) override
\r
182 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
\r
184 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
\r
185 if (it == std::end(observers_) || comp_(o, *it))
\r
187 lock.upgrade_to_writer();
\r
188 observers_.insert(it, o);
\r
192 void unsubscribe(const observer_ptr& o) override
\r
194 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
\r
196 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
\r
197 if(it != std::end(observers_) && !comp_(o, *it))
\r
199 lock.upgrade_to_writer();
\r
200 observers_.erase(it);
\r
204 void on_next(const I& e) override
\r
206 std::vector<spl::shared_ptr<observer>> observers;
\r
209 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
\r
211 auto expired = std::end(observers_);
\r
213 for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
\r
215 auto o = it->lock();
\r
217 observers.push_back(spl::make_shared_ptr(std::move(o)));
\r
222 if(expired != std::end(observers_))
\r
224 lock.upgrade_to_writer();
\r
225 observers_.erase(expired);
\r
229 for(auto it = std::begin(observers); it != std::end(observers); ++it)
\r
233 typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>> allocator;
\r
235 std::owner_less<std::weak_ptr<observer>> comp_;
\r
236 std::vector<std::weak_ptr<observer>, allocator> observers_;
\r
237 mutable tbb::spin_rw_mutex mutex_;
\r
240 template<typename I, typename O = I>
\r
241 class basic_subject : public subject<I, O>
\r
243 template <typename, typename> friend class basic_subject;
\r
245 basic_subject(const basic_subject&);
\r
246 basic_subject& operator=(const basic_subject&);
\r
248 typedef basic_subject_impl<I, O> impl;
\r
250 typedef typename subject<I, O>::observer observer;
\r
251 typedef typename subject<I, O>::observer_ptr observer_ptr;
\r
254 : impl_(std::make_shared<impl>())
\r
259 basic_subject(subject&& other)
\r
260 : impl_(std::move(other.impl_))
\r
264 virtual ~basic_subject()
\r
268 basic_subject& operator=(basic_subject&& other)
\r
273 void swap(basic_subject& other)
\r
275 impl_.swap(other.impl_);
\r
278 virtual void subscribe(const observer_ptr& o) override
\r
280 impl_->subscribe(o);
\r
283 virtual void unsubscribe(const observer_ptr& o) override
\r
285 impl_->unsubscribe(o);
\r
288 virtual void on_next(const I& e) override
\r
293 operator std::weak_ptr<observer>()
\r
298 std::shared_ptr<impl> impl_;
\r
301 template<typename F>
\r
302 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>
\r
303 make_observer(F func)
\r
305 return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
\r
308 template<typename T>
\r
309 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)
\r