-#pragma once\r
-\r
-#include "spl/memory.h"\r
-\r
-#include <tbb/spin_rw_mutex.h>\r
-#include <tbb/cache_aligned_allocator.h>\r
-\r
-#include <algorithm>\r
-#include <functional>\r
-#include <memory>\r
-#include <vector>\r
-\r
-namespace caspar { namespace reactive {\r
- \r
-namespace detail {\r
-\r
-// function_traits which works with MSVC2010 lambdas.\r
- \r
-template<typename FPtr>\r
-struct function_traits_impl;\r
-\r
-template<typename R, typename A1>\r
-struct function_traits_impl<R (*)(A1)>\r
-{\r
- typedef A1 arg1_type;\r
-};\r
-\r
-template<typename R, typename C, typename A1>\r
-struct function_traits_impl<R (C::*)(A1)>\r
-{\r
- typedef A1 arg1_type;\r
-};\r
-\r
-template<typename R, typename C, typename A1>\r
-struct function_traits_impl<R (C::*)(A1) const>\r
-{\r
- typedef A1 arg1_type;\r
-};\r
-\r
-template<typename T>\r
-typename function_traits_impl<T>::arg1_type arg1_type_helper(T);\r
-\r
-template<typename F>\r
-struct function_traits\r
-{\r
- typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;\r
-};\r
-\r
-}\r
-\r
-template<typename T>\r
-class observer\r
-{ \r
- observer(const observer&);\r
- observer& operator=(const observer&);\r
-public:\r
- typedef T value_type;\r
- \r
- observer()\r
- {\r
- }\r
-\r
- virtual ~observer()\r
- {\r
- }\r
-\r
- virtual void on_next(const T&) = 0;\r
-};\r
-\r
-template<typename T>\r
-class observable\r
-{\r
- observable(const observable&);\r
- observable& operator=(const observable&);\r
-public:\r
- typedef T value_type;\r
- typedef observer<T> observer;\r
- typedef std::weak_ptr<observer> observer_ptr;\r
-\r
- observable()\r
- {\r
- }\r
-\r
- virtual ~observable()\r
- {\r
- }\r
-\r
- virtual void subscribe(const observer_ptr&) = 0;\r
- virtual void unsubscribe(const observer_ptr&) = 0;\r
-};\r
-\r
-template<typename I, typename O = I>\r
-struct subject : public observer<I>, public observable<O>\r
-{\r
- typedef typename observable<O>::observer observer;\r
- typedef typename observable<O>::observer_ptr observer_ptr;\r
-\r
- virtual ~subject()\r
- {\r
- }\r
-};\r
-\r
-namespace detail {\r
-\r
-template<typename T>\r
-struct true_func\r
-{\r
- bool operator()(T)\r
- {\r
- return true;\r
- }\r
-};\r
-\r
-template<typename T>\r
-struct void_func\r
-{\r
- void operator()(T)\r
- {\r
- }\r
-};\r
-\r
-template<typename I, typename O>\r
-struct forward_func\r
-{\r
- forward_func(std::function<O(const I&)> func)\r
- : func_(std::move(func))\r
- {\r
- }\r
-\r
- O operator()(const I& value)\r
- {\r
- return func_(value);\r
- }\r
-\r
- std::function<O(const I&)> func_;\r
-};\r
-\r
-template<typename I>\r
-struct forward_func<I, I>\r
-{\r
- const I& operator()(const I& value)\r
- {\r
- return value;\r
- }\r
-};\r
-\r
-}\r
-\r
-template<typename T, typename C>\r
-class observer_function : public observer<T>\r
-{\r
-public:\r
- observer_function()\r
- {\r
- }\r
-\r
- observer_function(C func)\r
- : func_(std::move(func))\r
- {\r
- }\r
-\r
- observer_function(const observer_function& other)\r
- : func_(other.func_)\r
- {\r
- }\r
-\r
- observer_function(observer_function&& other)\r
- : func_(std::move(other.func_))\r
- {\r
- }\r
-\r
- observer_function& operator=(observer_function other)\r
- {\r
- other.swap(*this);\r
- }\r
-\r
- void swap(observer_function& other)\r
- {\r
- std::swap(func_, other.func_);\r
- std::swap(filter_, other.filter_);\r
- }\r
- \r
- virtual void on_next(const T& e) override\r
- {\r
- func_(e);\r
- }\r
-private:\r
- C func_;\r
-};\r
-\r
-template<typename T>\r
-class observer_function<T, detail::void_func<T>> : public observer<T>\r
-{\r
-public: \r
- virtual void on_next(const T& e) override\r
- {\r
- }\r
-};\r
-\r
-template<typename I, typename O = I>\r
-class basic_subject : public subject<I, O>\r
-{ \r
- template <typename, typename> friend class basic_subject;\r
-\r
- basic_subject(const basic_subject&);\r
- basic_subject& operator=(const basic_subject&);\r
-public: \r
- typedef typename subject<I, O>::observer observer;\r
- typedef typename subject<I, O>::observer_ptr observer_ptr;\r
-\r
- basic_subject()\r
- {\r
- }\r
- \r
- virtual ~basic_subject()\r
- {\r
- }\r
- \r
- basic_subject(basic_subject<typename observer::value_type, typename observable::value_type>&& other)\r
- : observers_(std::move(other.observers_))\r
- {\r
- }\r
- \r
- basic_subject& operator=(basic_subject<typename observer::value_type, typename observable::value_type>&& other)\r
- {\r
- other.swap(*this);\r
- return *this;\r
- }\r
- \r
- void swap(basic_subject<typename observer::value_type, typename observable::value_type>& other)\r
- { \r
- tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
- tbb::spin_rw_mutex::scoped_lock other_lock(other.mutex_, true);\r
-\r
- std::swap(observers_, other.observers_);\r
- }\r
-\r
- virtual void clear()\r
- {\r
- tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
-\r
- observers_.clear();\r
- }\r
- \r
- virtual void subscribe(const observer_ptr& o) override\r
- { \r
- tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
-\r
- auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
- if (it == std::end(observers_) || comp_(o, *it))\r
- {\r
- lock.upgrade_to_writer();\r
- observers_.insert(it, o);\r
- } \r
- }\r
-\r
- virtual void unsubscribe(const observer_ptr& o) override\r
- {\r
- tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
- \r
- auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
- if(it != std::end(observers_) && !comp_(o, *it))\r
- {\r
- lock.upgrade_to_writer();\r
- observers_.erase(it);\r
- } \r
- }\r
- \r
- virtual void on_next(const I& e) override\r
- { \r
- std::vector<spl::shared_ptr<observer>> observers;\r
-\r
- {\r
- tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
- \r
- auto expired = std::end(observers_);\r
-\r
- for(auto it = std::begin(observers_); it != std::end(observers_); ++it)\r
- {\r
- auto o = it->lock();\r
- if(o)\r
- observers.push_back(spl::make_shared_ptr(std::move(o)));\r
- else\r
- expired = it;\r
- }\r
-\r
- if(expired != std::end(observers_))\r
- { \r
- lock.upgrade_to_writer();\r
- observers_.erase(expired);\r
- } \r
- }\r
- \r
- for(auto it = std::begin(observers); it != std::end(observers); ++it)\r
- (*it)->on_next(e);\r
- }\r
-private:\r
- typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>> allocator;\r
-\r
- std::owner_less<std::weak_ptr<observer>> comp_;\r
- std::vector<std::weak_ptr<observer>, allocator> observers_;\r
- mutable tbb::spin_rw_mutex mutex_;\r
-};\r
-\r
-template<typename F>\r
-spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> \r
-make_observer(F func)\r
-{\r
- return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));\r
-}\r
-\r
-template<typename T>\r
-basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)\r
-{\r
- s.on_next(val);\r
- return s;\r
-}\r
-\r
+#pragma once
+
+#include "memory.h"
+#include "lock.h"
+
+#include <tbb/spin_rw_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
+
+#include <algorithm>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace caspar { namespace reactive {
+
+namespace detail {
+
+// function_traits which works with MSVC2010 lambdas.
+
+template<typename FPtr>
+struct function_traits_impl;
+
+template<typename R, typename A1>
+struct function_traits_impl<R (*)(A1)>
+{
+ typedef A1 arg1_type;
+};
+
+template<typename R, typename C, typename A1>
+struct function_traits_impl<R (C::*)(A1)>
+{
+ typedef A1 arg1_type;
+};
+
+template<typename R, typename C, typename A1>
+struct function_traits_impl<R (C::*)(A1) const>
+{
+ typedef A1 arg1_type;
+};
+
+template<typename T>
+typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
+
+template<typename F>
+struct function_traits
+{
+ typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
+};
+
+}
+
+template<typename T>
+class observer
+{
+ observer(const observer&);
+ observer& operator=(const observer&);
+public:
+
+ // Static Members
+
+ typedef T value_type;
+
+ // Constructors
+
+ observer()
+ {
+ }
+
+ virtual ~observer()
+ {
+ }
+
+ // Methods
+
+ virtual void on_next(const T&) = 0;
+
+ // Properties
+};
+
+template<typename T>
+class observable
+{
+ observable(const observable&);
+ observable& operator=(const observable&);
+public:
+
+ // Static Members
+
+ typedef T value_type;
+ typedef observer<T> observer;
+ typedef std::weak_ptr<observer> observer_ptr;
+
+ // Constructors
+
+ observable()
+ {
+ }
+
+ virtual ~observable()
+ {
+ }
+
+ // Methods
+
+ virtual void subscribe(const observer_ptr&) = 0;
+ virtual void unsubscribe(const observer_ptr&) = 0;
+
+ // Properties
+};
+
+template<typename I, typename O = I>
+class subject : public observer<I>, public observable<O>
+{
+public:
+
+ // Static Members
+
+ typedef typename observable<O>::observer observer;
+ typedef typename observable<O>::observer_ptr observer_ptr;
+
+ // Constructors
+
+ virtual ~subject()
+ {
+ }
+
+ // Methods
+
+ // Properties
+};
+
+template<typename T, typename C>
+class observer_function : public observer<T>
+{
+public:
+
+ // Static Members
+
+ // Constructors
+
+ observer_function()
+ {
+ }
+
+ observer_function(C func)
+ : func_(std::move(func))
+ {
+ }
+
+ observer_function(const observer_function& other)
+ : func_(other.func_)
+ {
+ }
+
+ observer_function(observer_function&& other)
+ : func_(std::move(other.func_))
+ {
+ }
+
+ observer_function& operator=(observer_function other)
+ {
+ other.swap(*this);
+ }
+
+ // Methods
+
+ void swap(observer_function& other)
+ {
+ std::swap(func_, other.func_);
+ std::swap(filter_, other.filter_);
+ }
+
+ void on_next(const T& e) override
+ {
+ func_(e);
+ }
+
+ // Properties
+private:
+ C func_;
+};
+
+template<typename I, typename O = I>
+class basic_subject_impl sealed : public subject<I, O>
+{
+ template <typename, typename> friend class basic_subject_impl;
+
+ basic_subject_impl(const basic_subject_impl&);
+ basic_subject_impl& operator=(const basic_subject_impl&);
+public:
+ // Static Members
+
+ typedef typename subject<I, O>::observer observer;
+ typedef typename subject<I, O>::observer_ptr observer_ptr;
+
+ // Constructors
+
+ basic_subject_impl()
+ {
+ }
+
+ basic_subject_impl(basic_subject_impl<I, O>&& other)
+ : observers_(std::move(other.observers_))
+ {
+ }
+
+ basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
+ {
+ observers_ = std::move(observers_);
+ return *this;
+ }
+
+ // Methods
+
+ void clear()
+ {
+ tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
+
+ observers_.clear();
+ }
+
+ void subscribe(const observer_ptr& o) override
+ {
+ tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
+
+ auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
+ if (it == std::end(observers_) || comp_(o, *it))
+ {
+ lock.upgrade_to_writer();
+ observers_.insert(it, o);
+ }
+ }
+
+ void unsubscribe(const observer_ptr& o) override
+ {
+ tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
+
+ auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
+ if(it != std::end(observers_) && !comp_(o, *it))
+ {
+ lock.upgrade_to_writer();
+ observers_.erase(it);
+ }
+ }
+
+ void on_next(const I& e) override
+ {
+ std::vector<spl::shared_ptr<observer>> observers;
+
+ {
+ tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
+
+ auto expired = std::end(observers_);
+
+ for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
+ {
+ auto o = it->lock();
+ if(o)
+ observers.push_back(spl::make_shared_ptr(std::move(o)));
+ else
+ expired = it;
+ }
+
+ if(expired != std::end(observers_))
+ {
+ lock.upgrade_to_writer();
+ observers_.erase(expired);
+ }
+ }
+
+ for(auto it = std::begin(observers); it != std::end(observers); ++it)
+ (*it)->on_next(e);
+ }
+
+ // Properties
+
+private:
+ typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>> allocator;
+
+ std::owner_less<std::weak_ptr<observer>> comp_;
+ std::vector<std::weak_ptr<observer>, allocator> observers_;
+ mutable tbb::spin_rw_mutex mutex_;
+};
+
+template<typename I, typename O = I>
+class basic_subject sealed : public subject<I, O>
+{
+ template <typename, typename> friend class basic_subject;
+
+ basic_subject(const basic_subject&);
+ basic_subject& operator=(const basic_subject&);
+
+ typedef basic_subject_impl<I, O> impl;
+public:
+
+ // Static Members
+
+ typedef typename subject<I, O>::observer observer;
+ typedef typename subject<I, O>::observer_ptr observer_ptr;
+
+ // Constructors
+
+ basic_subject()
+ : impl_(std::make_shared<impl>())
+
+ {
+ }
+
+ basic_subject(subject&& other)
+ : impl_(std::move(other.impl_))
+ {
+ }
+
+ basic_subject& operator=(basic_subject&& other)
+ {
+ other.swap(*this);
+ }
+
+ // Methods
+
+ void swap(basic_subject& other)
+ {
+ impl_.swap(other.impl_);
+ }
+
+ void subscribe(const observer_ptr& o) override
+ {
+ impl_->subscribe(o);
+ }
+
+ void unsubscribe(const observer_ptr& o) override
+ {
+ impl_->unsubscribe(o);
+ }
+
+ void on_next(const I& e) override
+ {
+ impl_->on_next(e);
+ }
+
+ operator std::weak_ptr<observer>()
+ {
+ return impl_;
+ }
+
+ // Properties
+
+private:
+ std::shared_ptr<impl> impl_;
+};
+
+template<typename F>
+spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>
+make_observer(F func)
+{
+ return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
+}
+
+template<typename T>
+basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)
+{
+ s.on_next(val);
+ return s;
+}
+
}}
\ No newline at end of file