]> git.sesse.net Git - casparcg/blobdiff - common/reactive.h
set svn:eol-style native on .h and .cpp files
[casparcg] / common / reactive.h
index 439e2d814079ece41e5af5fd9c7689d0ad548409..e56f8968e990976002b5f22c695186545031fddf 100644 (file)
-#pragma once\r
-\r
-#include "spl/memory.h"\r
-\r
-#include <tbb/spin_rw_mutex.h>\r
-#include <tbb/cache_aligned_allocator.h>\r
-\r
-#include <algorithm>\r
-#include <functional>\r
-#include <memory>\r
-#include <vector>\r
-\r
-namespace caspar { namespace reactive {\r
-       \r
-namespace detail {\r
-\r
-// function_traits which works with MSVC2010 lambdas.\r
-       \r
-template<typename FPtr>\r
-struct function_traits_impl;\r
-\r
-template<typename R, typename A1>\r
-struct function_traits_impl<R (*)(A1)>\r
-{\r
-       typedef A1 arg1_type;\r
-};\r
-\r
-template<typename R, typename C, typename A1>\r
-struct function_traits_impl<R (C::*)(A1)>\r
-{\r
-       typedef A1 arg1_type;\r
-};\r
-\r
-template<typename R, typename C, typename A1>\r
-struct function_traits_impl<R (C::*)(A1) const>\r
-{\r
-       typedef A1 arg1_type;\r
-};\r
-\r
-template<typename T>\r
-typename function_traits_impl<T>::arg1_type arg1_type_helper(T);\r
-\r
-template<typename F>\r
-struct function_traits\r
-{\r
-       typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;\r
-};\r
-\r
-}\r
-\r
-template<typename T>\r
-class observer\r
-{      \r
-       observer(const observer&);\r
-       observer& operator=(const observer&);\r
-public:\r
-       typedef T value_type;\r
-       \r
-       observer()\r
-       {\r
-       }\r
-\r
-       virtual ~observer()\r
-       {\r
-       }\r
-\r
-       virtual void on_next(const T&) = 0;\r
-};\r
-\r
-template<typename T>\r
-class observable\r
-{\r
-       observable(const observable&);\r
-       observable& operator=(const observable&);\r
-public:\r
-       typedef T                                               value_type;\r
-       typedef observer<T>                             observer;\r
-       typedef std::weak_ptr<observer> observer_ptr;\r
-\r
-       observable()\r
-       {\r
-       }\r
-\r
-       virtual ~observable()\r
-       {\r
-       }\r
-\r
-       virtual void subscribe(const observer_ptr&) = 0;\r
-       virtual void unsubscribe(const observer_ptr&) = 0;\r
-};\r
-\r
-template<typename I, typename O = I>\r
-struct subject : public observer<I>, public observable<O>\r
-{\r
-       typedef typename observable<O>::observer                observer;\r
-       typedef typename observable<O>::observer_ptr    observer_ptr;\r
-\r
-       virtual ~subject()\r
-       {\r
-       }\r
-};\r
-\r
-namespace detail {\r
-\r
-template<typename T>\r
-struct true_func\r
-{\r
-       bool operator()(T)\r
-       {\r
-               return true;\r
-       }\r
-};\r
-\r
-template<typename T>\r
-struct void_func\r
-{\r
-       void operator()(T)\r
-       {\r
-       }\r
-};\r
-\r
-template<typename I, typename O>\r
-struct forward_func\r
-{\r
-       forward_func(std::function<O(const I&)> func)\r
-               : func_(std::move(func))\r
-       {\r
-       }\r
-\r
-       O operator()(const I& value)\r
-       {\r
-               return func_(value);\r
-       }\r
-\r
-       std::function<O(const I&)> func_;\r
-};\r
-\r
-template<typename I>\r
-struct forward_func<I, I>\r
-{\r
-       const I& operator()(const I& value)\r
-       {\r
-               return value;\r
-       }\r
-};\r
-\r
-}\r
-\r
-template<typename T, typename C>\r
-class observer_function : public observer<T>\r
-{\r
-public:\r
-       observer_function()\r
-       {\r
-       }\r
-\r
-       observer_function(C func)\r
-               : func_(std::move(func))\r
-       {\r
-       }\r
-\r
-       observer_function(const observer_function& other)\r
-               : func_(other.func_)\r
-       {\r
-       }\r
-\r
-       observer_function(observer_function&& other)\r
-               : func_(std::move(other.func_))\r
-       {\r
-       }\r
-\r
-       observer_function& operator=(observer_function other)\r
-       {\r
-               other.swap(*this);\r
-       }\r
-\r
-       void swap(observer_function& other)\r
-       {\r
-               std::swap(func_, other.func_);\r
-               std::swap(filter_, other.filter_);\r
-       }\r
-               \r
-       virtual void on_next(const T& e) override\r
-       {\r
-               func_(e);\r
-       }\r
-private:\r
-       C func_;\r
-};\r
-\r
-template<typename T>\r
-class observer_function<T, detail::void_func<T>> : public observer<T>\r
-{\r
-public:                \r
-       virtual void on_next(const T& e) override\r
-       {\r
-       }\r
-};\r
-\r
-template<typename I, typename O = I>\r
-class basic_subject : public subject<I, O>\r
-{      \r
-    template <typename, typename> friend class basic_subject;\r
-\r
-       basic_subject(const basic_subject&);\r
-       basic_subject& operator=(const basic_subject&);\r
-public:        \r
-       typedef typename subject<I, O>::observer                observer;\r
-       typedef typename subject<I, O>::observer_ptr    observer_ptr;\r
-\r
-       basic_subject()\r
-       {\r
-       }\r
-                       \r
-       virtual ~basic_subject()\r
-       {\r
-       }\r
-               \r
-       basic_subject(basic_subject<typename observer::value_type, typename observable::value_type>&& other)\r
-               : observers_(std::move(other.observers_))\r
-       {\r
-       }\r
-       \r
-       basic_subject& operator=(basic_subject<typename observer::value_type, typename observable::value_type>&& other)\r
-       {\r
-               other.swap(*this);\r
-               return *this;\r
-       }\r
-       \r
-       void swap(basic_subject<typename observer::value_type, typename observable::value_type>& other)\r
-       {               \r
-               tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
-               tbb::spin_rw_mutex::scoped_lock other_lock(other.mutex_, true);\r
-\r
-               std::swap(observers_, other.observers_);\r
-       }\r
-\r
-       virtual void clear()\r
-       {\r
-               tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
-\r
-               observers_.clear();\r
-       }\r
-       \r
-       virtual void subscribe(const observer_ptr& o) override\r
-       {                               \r
-               tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
-\r
-               auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
-               if (it == std::end(observers_) || comp_(o, *it))\r
-               {\r
-                       lock.upgrade_to_writer();\r
-                       observers_.insert(it, o);\r
-               }               \r
-       }\r
-\r
-       virtual void unsubscribe(const observer_ptr& o) override\r
-       {\r
-               tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
-               \r
-               auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);\r
-               if(it != std::end(observers_) && !comp_(o, *it))\r
-               {\r
-                       lock.upgrade_to_writer();\r
-                       observers_.erase(it);\r
-               }               \r
-       }\r
-       \r
-       virtual void on_next(const I& e) override\r
-       {                               \r
-               std::vector<spl::shared_ptr<observer>> observers;\r
-\r
-               {\r
-                       tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);\r
-               \r
-                       auto expired = std::end(observers_);\r
-\r
-                       for(auto it = std::begin(observers_); it != std::end(observers_); ++it)\r
-                       {\r
-                               auto o = it->lock();\r
-                               if(o)\r
-                                       observers.push_back(spl::make_shared_ptr(std::move(o)));\r
-                               else\r
-                                       expired = it;\r
-                       }\r
-\r
-                       if(expired != std::end(observers_))\r
-                       {               \r
-                               lock.upgrade_to_writer();\r
-                               observers_.erase(expired);\r
-                       }       \r
-               }\r
-               \r
-               for(auto it = std::begin(observers); it != std::end(observers); ++it)\r
-                       (*it)->on_next(e);\r
-       }\r
-private:\r
-       typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>>   allocator;\r
-\r
-       std::owner_less<std::weak_ptr<observer>>                comp_;\r
-       std::vector<std::weak_ptr<observer>, allocator> observers_;\r
-       mutable tbb::spin_rw_mutex                                              mutex_;\r
-};\r
-\r
-template<typename F>\r
-spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> \r
-make_observer(F func)\r
-{\r
-       return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));\r
-}\r
-\r
-template<typename T>\r
-basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)\r
-{\r
-       s.on_next(val);\r
-       return s;\r
-}\r
-\r
+#pragma once
+
+#include "memory.h"
+#include "lock.h"
+
+#include <tbb/spin_rw_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
+
+#include <algorithm>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace caspar { namespace reactive {
+       
+namespace detail {
+
+// function_traits which works with MSVC2010 lambdas.
+       
+template<typename FPtr>
+struct function_traits_impl;
+
+template<typename R, typename A1>
+struct function_traits_impl<R (*)(A1)>
+{
+       typedef A1 arg1_type;
+};
+
+template<typename R, typename C, typename A1>
+struct function_traits_impl<R (C::*)(A1)>
+{
+       typedef A1 arg1_type;
+};
+
+template<typename R, typename C, typename A1>
+struct function_traits_impl<R (C::*)(A1) const>
+{
+       typedef A1 arg1_type;
+};
+
+template<typename T>
+typename function_traits_impl<T>::arg1_type arg1_type_helper(T);
+
+template<typename F>
+struct function_traits
+{
+       typedef decltype(detail::arg1_type_helper(&F::operator())) arg1_type;
+};
+
+}
+
+template<typename T>
+class observer
+{      
+       observer(const observer&);
+       observer& operator=(const observer&);
+public:
+
+       // Static Members
+
+       typedef T value_type;
+       
+       // Constructors
+
+       observer()
+       {
+       }
+
+       virtual ~observer()
+       {
+       }
+
+       // Methods
+
+       virtual void on_next(const T&) = 0;
+
+       // Properties
+};
+
+template<typename T>
+class observable
+{
+       observable(const observable&);
+       observable& operator=(const observable&);
+public:
+
+       // Static Members
+
+       typedef T                                               value_type;
+       typedef observer<T>                             observer;
+       typedef std::weak_ptr<observer> observer_ptr;
+       
+       // Constructors
+
+       observable()
+       {
+       }
+
+       virtual ~observable()
+       {
+       }
+       
+       // Methods
+
+       virtual void subscribe(const observer_ptr&) = 0;
+       virtual void unsubscribe(const observer_ptr&) = 0;
+       
+       // Properties
+};
+
+template<typename I, typename O = I>
+class subject : public observer<I>, public observable<O>
+{
+public:
+       
+       // Static Members
+
+       typedef typename observable<O>::observer                observer;
+       typedef typename observable<O>::observer_ptr    observer_ptr;
+       
+       // Constructors
+
+       virtual ~subject()
+       {
+       }
+       
+       // Methods
+
+       // Properties
+};
+
+template<typename T, typename C>
+class observer_function : public observer<T>
+{
+public:
+       
+       // Static Members
+       
+       // Constructors
+
+       observer_function()
+       {
+       }
+
+       observer_function(C func)
+               : func_(std::move(func))
+       {
+       }
+
+       observer_function(const observer_function& other)
+               : func_(other.func_)
+       {
+       }
+
+       observer_function(observer_function&& other)
+               : func_(std::move(other.func_))
+       {
+       }
+
+       observer_function& operator=(observer_function other)
+       {
+               other.swap(*this);
+       }
+               
+       // Methods
+
+       void swap(observer_function& other)
+       {
+               std::swap(func_, other.func_);
+               std::swap(filter_, other.filter_);
+       }
+               
+       void on_next(const T& e) override
+       {
+               func_(e);
+       }
+
+       // Properties
+private:
+       C func_;
+};
+
+template<typename I, typename O = I>
+class basic_subject_impl sealed : public subject<I, O>
+{      
+    template <typename, typename> friend class basic_subject_impl;
+
+       basic_subject_impl(const basic_subject_impl&);
+       basic_subject_impl& operator=(const basic_subject_impl&);
+public:        
+       // Static Members
+
+       typedef typename subject<I, O>::observer                observer;
+       typedef typename subject<I, O>::observer_ptr    observer_ptr;
+
+       // Constructors
+
+       basic_subject_impl()
+       {
+       }
+                                       
+       basic_subject_impl(basic_subject_impl<I, O>&& other)
+               : observers_(std::move(other.observers_))
+       {
+       }
+       
+       basic_subject_impl& operator=(basic_subject_impl<I, O>&& other)
+       {
+               observers_ = std::move(observers_);
+               return *this;
+       }
+
+       // Methods
+
+       void clear()
+       {
+               tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
+
+               observers_.clear();
+       }
+       
+       void subscribe(const observer_ptr& o) override
+       {                               
+               tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
+
+               auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
+               if (it == std::end(observers_) || comp_(o, *it))
+               {
+                       lock.upgrade_to_writer();
+                       observers_.insert(it, o);
+               }               
+       }
+
+       void unsubscribe(const observer_ptr& o) override
+       {
+               tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
+               
+               auto it = std::lower_bound(std::begin(observers_), std::end(observers_), o, comp_);
+               if(it != std::end(observers_) && !comp_(o, *it))
+               {
+                       lock.upgrade_to_writer();
+                       observers_.erase(it);
+               }               
+       }
+       
+       void on_next(const I& e) override
+       {                               
+               std::vector<spl::shared_ptr<observer>> observers;
+
+               {
+                       tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
+               
+                       auto expired = std::end(observers_);
+
+                       for(auto it = std::begin(observers_); it != std::end(observers_); ++it)
+                       {
+                               auto o = it->lock();
+                               if(o)
+                                       observers.push_back(spl::make_shared_ptr(std::move(o)));
+                               else
+                                       expired = it;
+                       }
+
+                       if(expired != std::end(observers_))
+                       {               
+                               lock.upgrade_to_writer();
+                               observers_.erase(expired);
+                       }       
+               }
+               
+               for(auto it = std::begin(observers); it != std::end(observers); ++it)
+                       (*it)->on_next(e);
+       }
+
+       // Properties
+
+private:
+       typedef tbb::cache_aligned_allocator<std::weak_ptr<observer>>   allocator;
+
+       std::owner_less<std::weak_ptr<observer>>                comp_;
+       std::vector<std::weak_ptr<observer>, allocator> observers_;
+       mutable tbb::spin_rw_mutex                                              mutex_;
+};
+
+template<typename I, typename O = I>
+class basic_subject sealed : public subject<I, O>
+{      
+    template <typename, typename> friend class basic_subject;
+
+       basic_subject(const basic_subject&);
+       basic_subject& operator=(const basic_subject&);
+
+       typedef basic_subject_impl<I, O> impl;
+public:        
+
+       // Static Members
+
+       typedef typename subject<I, O>::observer                observer;
+       typedef typename subject<I, O>::observer_ptr    observer_ptr;
+
+       // Constructors
+
+       basic_subject()
+               : impl_(std::make_shared<impl>())
+
+       {
+       }
+               
+       basic_subject(subject&& other)
+               : impl_(std::move(other.impl_))
+       {
+       }
+       
+       basic_subject& operator=(basic_subject&& other)
+       {
+               other.swap(*this);
+       }
+
+       // Methods
+
+       void swap(basic_subject& other)
+       {
+               impl_.swap(other.impl_);
+       }
+       
+       void subscribe(const observer_ptr& o) override
+       {                               
+               impl_->subscribe(o);
+       }
+
+       void unsubscribe(const observer_ptr& o) override
+       {
+               impl_->unsubscribe(o);
+       }
+                               
+       void on_next(const I& e) override
+       {                               
+               impl_->on_next(e);
+       }
+
+       operator std::weak_ptr<observer>()
+       {
+               return impl_;
+       }
+
+       // Properties
+
+private:
+       std::shared_ptr<impl> impl_;
+};
+
+template<typename F>
+spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> 
+make_observer(F func)
+{
+       return spl::make_shared<observer_function<std::decay<typename detail::function_traits<F>::arg1_type>::type, F>>(std::move(func));
+}
+
+template<typename T>
+basic_subject<T>& operator<<(basic_subject<T>& s, const T& val)
+{
+       s.on_next(val);
+       return s;
+}
+
 }}
\ No newline at end of file