6 #include <tbb/spin_rw_mutex.h>
\r
7 #include <tbb/cache_aligned_allocator.h>
\r
10 #include <functional>
\r
14 namespace caspar { namespace reactive {
\r
18 // function_traits which works with MSVC2010 lambdas.
\r
20 template<typename FPtr>
\r
21 struct function_traits_impl;
\r
23 template<typename R, typename A1>
\r
24 struct function_traits_impl<R (*)(A1)>
\r
26 typedef A1 arg1_type;
\r
29 template<typename R, typename C, typename A1>
\r
30 struct function_traits_impl<R (C::*)(A1)>
\r
32 typedef A1 arg1_type;
\r
35 template<typename R, typename C, typename A1>
\r
36 struct function_traits_impl<R (C::*)(A1) const>
\r
38 typedef A1 arg1_type;
\r
41 template<typename T>
\r
42 typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
\r
44 template<typename F>
\r
45 struct function_traits
\r
47 typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
\r
52 template<typename T>
\r
55 observer(const observer&);
\r
56 observer& operator=(const observer&);
\r
61 typedef T value_type;
\r
75 virtual void on_next(const T&) = 0;
\r
80 template<typename T>
\r
83 observable(const observable&);
\r
84 observable& operator=(const observable&);
\r
89 typedef T value_type;
\r
90 typedef observer<T> observer;
\r
91 typedef std::weak_ptr<observer> observer_ptr;
\r
99 virtual ~observable()
\r
105 virtual void subscribe(const observer_ptr&) = 0;
\r
106 virtual void unsubscribe(const observer_ptr&) = 0;
\r
111 template<typename I, typename O = I>
\r
112 class subject : public observer<I>, public observable<O>
\r
118 typedef typename observable<O>::observer observer;
\r
119 typedef typename observable<O>::observer_ptr observer_ptr;
\r
132 template<typename T, typename C>
\r
133 class observer_function : public observer<T>
\r
141 observer_function()
\r
145 observer_function(C func)
\r
146 : func_(std::move(func))
\r
150 observer_function(const observer_function& other)
\r
151 : func_(other.func_)
\r
155 observer_function(observer_function&& other)
\r
156 : func_(std::move(other.func_))
\r
160 observer_function& operator=(observer_function other)
\r
167 void swap(observer_function& other)
\r
169 std::swap(func_, other.func_);
\r
170 std::swap(filter_, other.filter_);
\r
173 void on_next(const T& e) override
\r
183 template<typename I, typename O = I>
\r
184 class basic_subject_impl sealed : public subject<I, O>
\r
186 template <typename, typename> friend class basic_subject_impl;
\r
188 basic_subject_impl(const basic_subject_impl&);
\r
189 basic_subject_impl& operator=(const basic_subject_impl&);
\r
193 typedef typename subject<I, O>::observer observer;
\r
194 typedef typename subject<I, O>::observer_ptr observer_ptr;
\r
198 basic_subject_impl()
\r
202 basic_subject_impl(basic_subject_impl<I, O>&& other)
\r
203 : observers_(std::move(other.observers_))
\r
207 basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
\r
209 observers_ = std::move(observers_);
\r
217 tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
\r
219 observers_.clear();
\r
222 void subscribe(const observer_ptr& o) override
\r
224 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
\r
226 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
\r
227 if (it == std::end(observers_) || comp_(o, *it))
\r
229 lock.upgrade_to_writer();
\r
230 observers_.insert(it, o);
\r
234 void unsubscribe(const observer_ptr& o) override
\r
236 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
\r
238 auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
\r
239 if(it != std::end(observers_) && !comp_(o, *it))
\r
241 lock.upgrade_to_writer();
\r
242 observers_.erase(it);
\r
246 void on_next(const I& e) override
\r
248 std::vector<spl::shared_ptr<observer>> observers;
\r
251 tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
\r
253 auto expired = std::end(observers_);
\r
255 for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
\r
257 auto o = it->lock();
\r
259 observers.push_back(spl::make_shared_ptr(std::move(o)));
\r
264 if(expired != std::end(observers_))
\r
266 lock.upgrade_to_writer();
\r
267 observers_.erase(expired);
\r
271 for(auto it = std::begin(observers); it != std::end(observers); ++it)
\r
278 typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>> allocator;
\r
280 std::owner_less<std::weak_ptr<observer>> comp_;
\r
281 std::vector<std::weak_ptr<observer>, allocator> observers_;
\r
282 mutable tbb::spin_rw_mutex mutex_;
\r
285 template<typename I, typename O = I>
\r
286 class basic_subject sealed : public subject<I, O>
\r
288 template <typename, typename> friend class basic_subject;
\r
290 basic_subject(const basic_subject&);
\r
291 basic_subject& operator=(const basic_subject&);
\r
293 typedef basic_subject_impl<I, O> impl;
\r
298 typedef typename subject<I, O>::observer observer;
\r
299 typedef typename subject<I, O>::observer_ptr observer_ptr;
\r
304 : impl_(std::make_shared<impl>())
\r
309 basic_subject(subject&& other)
\r
310 : impl_(std::move(other.impl_))
\r
314 basic_subject& operator=(basic_subject&& other)
\r
321 void swap(basic_subject& other)
\r
323 impl_.swap(other.impl_);
\r
326 void subscribe(const observer_ptr& o) override
\r
328 impl_->subscribe(o);
\r
331 void unsubscribe(const observer_ptr& o) override
\r
333 impl_->unsubscribe(o);
\r
336 void on_next(const I& e) override
\r
341 operator std::weak_ptr<observer>()
\r
349 std::shared_ptr<impl> impl_;
\r
352 template<typename F>
\r
353 spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>
\r
354 make_observer(F func)
\r
356 return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
\r
359 template<typename T>
\r
360 basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)
\r