X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Freactive.h;h=c4f6930335d740eb5113b73ae54a7a3cf6468974;hb=1f2344fe8705342b0503af4609064267e9ae42f4;hp=9d0efeed9c5d32d1ea45a806fc008f1617737c45;hpb=a9bb7ddbd5ce4774a1d29fdd5f4e7a22e4bfed40;p=casparcg diff --git a/common/reactive.h b/common/reactive.h index 9d0efeed9..c4f693033 100644 --- a/common/reactive.h +++ b/common/reactive.h @@ -1,366 +1,366 @@ -#pragma once - -#include "memory.h" -#include "lock.h" - -#include -#include - -#include -#include -#include -#include - -namespace caspar { namespace reactive { - -namespace detail { - -// function_traits which works with MSVC2010 lambdas. - -template -struct function_traits_impl; - -template -struct function_traits_impl -{ - typedef A1 arg1_type; -}; - -template -struct function_traits_impl -{ - typedef A1 arg1_type; -}; - -template -struct function_traits_impl -{ - typedef A1 arg1_type; -}; - -template -typename function_traits_impl::arg1_type arg1_type_helper(T); - -template -struct function_traits -{ - typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type; -}; - -} - -template -class observer -{ - observer(const observer&); - observer& operator=(const observer&); -public: - - // Static Members - - typedef T value_type; - - // Constructors - - observer() - { - } - - virtual ~observer() - { - } - - // Methods - - virtual void on_next(const T&) = 0; - - // Properties -}; - -template -class observable -{ - observable(const observable&); - observable& operator=(const observable&); -public: - - // Static Members - - typedef T value_type; - typedef observer observer; - typedef std::weak_ptr observer_ptr; - - // Constructors - - observable() - { - } - - virtual ~observable() - { - } - - // Methods - - virtual void subscribe(const observer_ptr&) = 0; - virtual void unsubscribe(const observer_ptr&) = 0; - - // Properties -}; - -template -class subject : public observer, public observable -{ -public: - - // Static Members - - typedef typename observable::observer observer; - typedef typename observable::observer_ptr observer_ptr; - - // Constructors - - virtual ~subject() - { - } - - // Methods - - // Properties -}; - -template -class observer_function : public observer -{ -public: - - // Static Members - - // Constructors - - observer_function() - { - } - - observer_function(C func) - : func_(std::move(func)) - { - } - - observer_function(const observer_function& other) - : func_(other.func_) - { - } - - observer_function(observer_function&& other) - : func_(std::move(other.func_)) - { - } - - observer_function& operator=(observer_function other) - { - other.swap(*this); - } - - // Methods - - void swap(observer_function& other) - { - std::swap(func_, other.func_); - std::swap(filter_, other.filter_); - } - - virtual void on_next(const T& e) override - { - func_(e); - } - - // Properties -private: - C func_; -}; - -template -class basic_subject_impl sealed : public subject -{ - template friend class basic_subject_impl; - - basic_subject_impl(const basic_subject_impl&); - basic_subject_impl& operator=(const basic_subject_impl&); -public: - // Static Members - - typedef typename subject::observer observer; - typedef typename subject::observer_ptr observer_ptr; - - // Constructors - - basic_subject_impl() - { - } - - basic_subject_impl(basic_subject_impl&& other) - : observers_(std::move(other.observers_)) - { - } - - basic_subject_impl& operator=(basic_subject_impl&& other) - { - observers_ = std::move(observers_); - return *this; - } - - // Methods - - void clear() - { - tbb::spin_rw_mutex::scoped_lock lock(mutex_, true); - - observers_.clear(); - } - - void subscribe(const observer_ptr& o) override - { - tbb::spin_rw_mutex::scoped_lock lock(mutex_, false); - - auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_); - if (it == std::end(observers_) || comp_(o, *it)) - { - lock.upgrade_to_writer(); - observers_.insert(it, o); - } - } - - void unsubscribe(const observer_ptr& o) override - { - tbb::spin_rw_mutex::scoped_lock lock(mutex_, false); - - auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_); - if(it != std::end(observers_) && !comp_(o, *it)) - { - lock.upgrade_to_writer(); - observers_.erase(it); - } - } - - void on_next(const I& e) override - { - std::vector> observers; - - { - tbb::spin_rw_mutex::scoped_lock lock(mutex_, false); - - auto expired = std::end(observers_); - - for(auto it = std::begin(observers_); it != std::end(observers_); ++it) - { - auto o = it->lock(); - if(o) - observers.push_back(spl::make_shared_ptr(std::move(o))); - else - expired = it; - } - - if(expired != std::end(observers_)) - { - lock.upgrade_to_writer(); - observers_.erase(expired); - } - } - - for(auto it = std::begin(observers); it != std::end(observers); ++it) - (*it)->on_next(e); - } - - // Properties - -private: - typedef tbb::cache_aligned_allocator> allocator; - - std::owner_less> comp_; - std::vector, allocator> observers_; - mutable tbb::spin_rw_mutex mutex_; -}; - -template -class basic_subject sealed : public subject -{ - template friend class basic_subject; - - basic_subject(const basic_subject&); - basic_subject& operator=(const basic_subject&); - - typedef basic_subject_impl impl; -public: - - // Static Members - - typedef typename subject::observer observer; - typedef typename subject::observer_ptr observer_ptr; - - // Constructors - - basic_subject() - : impl_(std::make_shared()) - - { - } - - basic_subject(subject&& other) - : impl_(std::move(other.impl_)) - { - } - - basic_subject& operator=(basic_subject&& other) - { - other.swap(*this); - } - - // Methods - - void swap(basic_subject& other) - { - impl_.swap(other.impl_); - } - - virtual void subscribe(const observer_ptr& o) override - { - impl_->subscribe(o); - } - - virtual void unsubscribe(const observer_ptr& o) override - { - impl_->unsubscribe(o); - } - - virtual void on_next(const I& e) override - { - impl_->on_next(e); - } - - operator std::weak_ptr() - { - return impl_; - } - - // Properties - -private: - std::shared_ptr impl_; -}; - -template -spl::shared_ptr::arg1_type>::type, F>> -make_observer(F func) -{ - return spl::make_shared::arg1_type>::type, F>>(std::move(func)); -} - -template -basic_subject& operator<<(basic_subject& s, const T& val) -{ - s.on_next(val); - return s; -} - +#pragma once + +#include "memory.h" +#include "lock.h" + +#include +#include + +#include +#include +#include +#include + +namespace caspar { namespace reactive { + +namespace detail { + +// function_traits which works with MSVC2010 lambdas. + +template +struct function_traits_impl; + +template +struct function_traits_impl +{ + typedef A1 arg1_type; +}; + +template +struct function_traits_impl +{ + typedef A1 arg1_type; +}; + +template +struct function_traits_impl +{ + typedef A1 arg1_type; +}; + +template +typename function_traits_impl::arg1_type arg1_type_helper(T); + +template +struct function_traits +{ + typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type; +}; + +} + +template +class observer +{ + observer(const observer&); + observer& operator=(const observer&); +public: + + // Static Members + + typedef T value_type; + + // Constructors + + observer() + { + } + + virtual ~observer() + { + } + + // Methods + + virtual void on_next(const T&) = 0; + + // Properties +}; + +template +class observable +{ + observable(const observable&); + observable& operator=(const observable&); +public: + + // Static Members + + typedef T value_type; + typedef observer observer; + typedef std::weak_ptr observer_ptr; + + // Constructors + + observable() + { + } + + virtual ~observable() + { + } + + // Methods + + virtual void subscribe(const observer_ptr&) = 0; + virtual void unsubscribe(const observer_ptr&) = 0; + + // Properties +}; + +template +class subject : public observer, public observable +{ +public: + + // Static Members + + typedef typename observable::observer observer; + typedef typename observable::observer_ptr observer_ptr; + + // Constructors + + virtual ~subject() + { + } + + // Methods + + // Properties +}; + +template +class observer_function : public observer +{ +public: + + // Static Members + + // Constructors + + observer_function() + { + } + + observer_function(C func) + : func_(std::move(func)) + { + } + + observer_function(const observer_function& other) + : func_(other.func_) + { + } + + observer_function(observer_function&& other) + : func_(std::move(other.func_)) + { + } + + observer_function& operator=(observer_function other) + { + other.swap(*this); + } + + // Methods + + void swap(observer_function& other) + { + std::swap(func_, other.func_); + std::swap(filter_, other.filter_); + } + + void on_next(const T& e) override + { + func_(e); + } + + // Properties +private: + C func_; +}; + +template +class basic_subject_impl /* final */ : public subject +{ + template friend class basic_subject_impl; + + basic_subject_impl(const basic_subject_impl&); + basic_subject_impl& operator=(const basic_subject_impl&); +public: + // Static Members + + typedef typename subject::observer observer; + typedef typename subject::observer_ptr observer_ptr; + + // Constructors + + basic_subject_impl() + { + } + + basic_subject_impl(basic_subject_impl&& other) + : observers_(std::move(other.observers_)) + { + } + + basic_subject_impl& operator=(basic_subject_impl&& other) + { + observers_ = std::move(observers_); + return *this; + } + + // Methods + + void clear() + { + tbb::spin_rw_mutex::scoped_lock lock(mutex_, true); + + observers_.clear(); + } + + void subscribe(const observer_ptr& o) override + { + tbb::spin_rw_mutex::scoped_lock lock(mutex_, false); + + auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_); + if (it == std::end(observers_) || comp_(o, *it)) + { + lock.upgrade_to_writer(); + observers_.insert(it, o); + } + } + + void unsubscribe(const observer_ptr& o) override + { + tbb::spin_rw_mutex::scoped_lock lock(mutex_, false); + + auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_); + if(it != std::end(observers_) && !comp_(o, *it)) + { + lock.upgrade_to_writer(); + observers_.erase(it); + } + } + + void on_next(const I& e) override + { + std::vector> observers; + + { + tbb::spin_rw_mutex::scoped_lock lock(mutex_, false); + + auto expired = std::end(observers_); + + for(auto it = std::begin(observers_); it != std::end(observers_); ++it) + { + auto o = it->lock(); + if(o) + observers.push_back(spl::make_shared_ptr(std::move(o))); + else + expired = it; + } + + if(expired != std::end(observers_)) + { + lock.upgrade_to_writer(); + observers_.erase(expired); + } + } + + for(auto it = std::begin(observers); it != std::end(observers); ++it) + (*it)->on_next(e); + } + + // Properties + +private: + typedef tbb::cache_aligned_allocator> allocator; + + std::owner_less> comp_; + std::vector, allocator> observers_; + mutable tbb::spin_rw_mutex mutex_; +}; + +template +class basic_subject : public subject +{ + template friend class basic_subject; + + basic_subject(const basic_subject&); + basic_subject& operator=(const basic_subject&); + + typedef basic_subject_impl impl; +public: + + // Static Members + + typedef typename subject::observer observer; + typedef typename subject::observer_ptr observer_ptr; + + // Constructors + + basic_subject() + : impl_(std::make_shared()) + + { + } + + basic_subject(basic_subject&& other) + : impl_(std::move(other.impl_)) + { + } + + basic_subject& operator=(basic_subject&& other) + { + other.swap(*this); + } + + // Methods + + void swap(basic_subject& other) + { + impl_.swap(other.impl_); + } + + void subscribe(const observer_ptr& o) override + { + impl_->subscribe(o); + } + + void unsubscribe(const observer_ptr& o) override + { + impl_->unsubscribe(o); + } + + void on_next(const I& e) override + { + impl_->on_next(e); + } + + operator std::weak_ptr() + { + return impl_; + } + + // Properties + +private: + std::shared_ptr impl_; +}; + +template +spl::shared_ptr::arg1_type>::type, F>> +make_observer(F func) +{ + return spl::make_shared::arg1_type>::type, F>>(std::move(func)); +} + +template +basic_subject& operator<<(basic_subject& s, const T& val) +{ + s.on_next(val); + return s; +} + }} \ No newline at end of file