6 #include <tbb/spin_rw_mutex.h>
7 #include <tbb/cache_aligned_allocator.h>
14 namespace caspar { namespace reactive {
18 // function_traits which works with MSVC2010 lambdas.
20 template<typename FPtr>
21 struct function_traits_impl;
23 template<typename R, typename A1>
24 struct function_traits_impl<R (*)(A1)>
29 template<typename R, typename C, typename A1>
30 struct function_traits_impl<R (C::*)(A1)>
35 template<typename R, typename C, typename A1>
36 struct function_traits_impl<R (C::*)(A1) const>
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
45 struct function_traits
47 typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
55 observer(const observer&);
56 observer& operator=(const observer&);
75 virtual void on_next(const T&) = 0;
83 observable(const observable&);
84 observable& operator=(const observable&);
90 typedef observer<T> observer;
91 typedef std::weak_ptr<observer> observer_ptr;
105 virtual void subscribe(const observer_ptr&) = 0;
106 virtual void unsubscribe(const observer_ptr&) = 0;
111 template<typename I, typename O = I>
112 class subject : public observer<I>, public observable<O>
118 typedef typename observable<O>::observer observer;
119 typedef typename observable<O>::observer_ptr observer_ptr;
132 template<typename T, typename C>
133 class observer_function : public observer<T>
145 observer_function(C func)
146 : func_(std::move(func))
150 observer_function(const observer_function& other)
155 observer_function(observer_function&& other)
156 : func_(std::move(other.func_))
160 observer_function& operator=(observer_function other)
167 void swap(observer_function& other)
169 std::swap(func_, other.func_);
170 std::swap(filter_, other.filter_);
173 void on_next(const T& e) override
183 template<typename I, typename O = I>
184 class basic_subject_impl /* final */ : public subject<I, O>
186 template <typename, typename> friend class basic_subject_impl;
188 basic_subject_impl(const basic_subject_impl&);
189 basic_subject_impl& operator=(const basic_subject_impl&);
193 typedef typename subject<I, O>::observer observer;
194 typedef typename subject<I, O>::observer_ptr observer_ptr;
202 basic_subject_impl(basic_subject_impl<I, O>&& other)
203 : observers_(std::move(other.observers_))
207 basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
209 observers_ = std::move(observers_);
217 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
222 void subscribe(const observer_ptr& o) override
224 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
226 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
227 if (it == std::end(observers_) || comp_(o, *it))
229 lock.upgrade_to_writer();
230 observers_.insert(it, o);
234 void unsubscribe(const observer_ptr& o) override
236 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
238 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
239 if(it != std::end(observers_) && !comp_(o, *it))
241 lock.upgrade_to_writer();
242 observers_.erase(it);
246 void on_next(const I& e) override
248 std::vector<spl::shared_ptr<observer>> observers;
251 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
253 auto expired = std::end(observers_);
255 for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
259 observers.push_back(spl::make_shared_ptr(std::move(o)));
264 if(expired != std::end(observers_))
266 lock.upgrade_to_writer();
267 observers_.erase(expired);
271 for(auto it = std::begin(observers); it != std::end(observers); ++it)
278 typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>> allocator;
280 std::owner_less<std::weak_ptr<observer>> comp_;
281 std::vector<std::weak_ptr<observer>, allocator> observers_;
282 mutable tbb::spin_rw_mutex mutex_;
285 template<typename I, typename O = I>
286 class basic_subject : public subject<I, O>
288 template <typename, typename> friend class basic_subject;
290 basic_subject(const basic_subject&);
291 basic_subject& operator=(const basic_subject&);
293 typedef basic_subject_impl<I, O> impl;
298 typedef typename subject<I, O>::observer observer;
299 typedef typename subject<I, O>::observer_ptr observer_ptr;
304 : impl_(std::make_shared<impl>())
309 basic_subject(basic_subject&& other)
310 : impl_(std::move(other.impl_))
314 basic_subject& operator=(basic_subject&& other)
321 void swap(basic_subject& other)
323 impl_.swap(other.impl_);
326 void subscribe(const observer_ptr& o) override
331 void unsubscribe(const observer_ptr& o) override
333 impl_->unsubscribe(o);
336 void on_next(const I& e) override
341 operator std::weak_ptr<observer>()
349 std::shared_ptr<impl> impl_;
353 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>
354 make_observer(F func)
356 return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
360 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)