6 #include <tbb/spin_rw_mutex.h>
7 #include <tbb/cache_aligned_allocator.h>
14 namespace caspar { namespace reactive {
18 // function_traits which works with MSVC2010 lambdas.
20 template<typename FPtr>
21 struct function_traits_impl;
23 template<typename R, typename A1>
24 struct function_traits_impl<R (*)(A1)>
29 template<typename R, typename C, typename A1>
30 struct function_traits_impl<R (C::*)(A1)>
35 template<typename R, typename C, typename A1>
36 struct function_traits_impl<R (C::*)(A1) const>
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
45 struct function_traits
47 typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
55 observer(const observer&);
56 observer& operator=(const observer&);
75 virtual void on_next(const T&) = 0;
83 observable(const observable&);
84 observable& operator=(const observable&);
90 typedef reactive::observer<T> observer;
91 typedef std::weak_ptr<reactive::observer<T>> observer_ptr;
105 virtual void subscribe(const observer_ptr&) = 0;
106 virtual void unsubscribe(const observer_ptr&) = 0;
111 template<typename I, typename O = I>
112 class subject : public observer<I>, public observable<O>
118 typedef typename observable<O>::observer observer;
119 typedef typename observable<O>::observer_ptr observer_ptr;
132 template<typename T, typename C>
133 class observer_function : public observer<T>
145 observer_function(C func)
146 : func_(std::move(func))
150 observer_function(const observer_function& other)
155 observer_function(observer_function&& other)
156 : func_(std::move(other.func_))
160 observer_function& operator=(observer_function other)
167 void swap(observer_function& other)
169 std::swap(func_, other.func_);
172 void on_next(const T& e) override
182 template<typename I, typename O = I>
183 class basic_subject_impl final : public subject<I, O>
185 template <typename, typename> friend class basic_subject_impl;
187 basic_subject_impl(const basic_subject_impl&);
188 basic_subject_impl& operator=(const basic_subject_impl&);
192 typedef typename subject<I, O>::observer observer;
193 typedef typename subject<I, O>::observer_ptr observer_ptr;
201 basic_subject_impl(basic_subject_impl<I, O>&& other)
202 : observers_(std::move(other.observers_))
206 basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
208 observers_ = std::move(observers_);
216 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
221 void subscribe(const observer_ptr& o) override
223 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
225 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
226 if (it == std::end(observers_) || comp_(o, *it))
228 lock.upgrade_to_writer();
229 observers_.insert(it, o);
233 void unsubscribe(const observer_ptr& o) override
235 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
237 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
238 if(it != std::end(observers_) && !comp_(o, *it))
240 lock.upgrade_to_writer();
241 observers_.erase(it);
245 void on_next(const I& e) override
247 std::vector<spl::shared_ptr<observer>> observers;
250 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
252 auto expired = std::end(observers_);
254 for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
258 observers.push_back(spl::make_shared_ptr(std::move(o)));
263 if(expired != std::end(observers_))
265 lock.upgrade_to_writer();
266 observers_.erase(expired);
270 for(auto it = std::begin(observers); it != std::end(observers); ++it)
277 typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>> allocator;
279 std::owner_less<std::weak_ptr<observer>> comp_;
280 std::vector<std::weak_ptr<observer>, allocator> observers_;
281 mutable tbb::spin_rw_mutex mutex_;
284 template<typename I, typename O = I>
285 class basic_subject : public subject<I, O>
287 template <typename, typename> friend class basic_subject;
289 basic_subject(const basic_subject&);
290 basic_subject& operator=(const basic_subject&);
292 typedef basic_subject_impl<I, O> impl;
297 typedef typename subject<I, O>::observer observer;
298 typedef typename subject<I, O>::observer_ptr observer_ptr;
303 : impl_(std::make_shared<impl>())
308 basic_subject(basic_subject&& other)
309 : impl_(std::move(other.impl_))
313 basic_subject& operator=(basic_subject&& other)
320 void swap(basic_subject& other)
322 impl_.swap(other.impl_);
325 void subscribe(const observer_ptr& o) override
330 void unsubscribe(const observer_ptr& o) override
332 impl_->unsubscribe(o);
335 void on_next(const I& e) override
340 operator std::weak_ptr<observer>()
348 std::shared_ptr<impl> impl_;
352 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>
353 make_observer(F func)
355 return spl::make_shared<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
359 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)