}\r
};\r
\r
-namespace detail {\r
-\r
-template<typename T>\r
-struct true_func\r
-{\r
- bool operator()(T)\r
- {\r
- return true;\r
- }\r
-};\r
-\r
-template<typename T>\r
-struct void_func\r
-{\r
- void operator()(T)\r
- {\r
- }\r
-};\r
-\r
-template<typename I, typename O>\r
-struct forward_func\r
-{\r
- forward_func(std::function<O(const I&)> func)\r
- : func_(std::move(func))\r
- {\r
- }\r
-\r
- O operator()(const I& value)\r
- {\r
- return func_(value);\r
- }\r
-\r
- std::function<O(const I&)> func_;\r
-};\r
-\r
-template<typename I>\r
-struct forward_func<I, I>\r
-{\r
- const I& operator()(const I& value)\r
- {\r
- return value;\r
- }\r
-};\r
-\r
-}\r
-\r
template<typename T, typename C>\r
class observer_function : public observer<T>\r
{\r
C func_;\r
};\r
\r
-template<typename T>\r
-class observer_function<T, detail::void_func<T>> : public observer<T>\r
-{\r
-public: \r
- virtual void on_next(const T& e) override\r
- {\r
- }\r
-};\r
-\r
template<typename I, typename O = I>\r
-class basic_subject : public subject<I, O>\r
+class basic_subject_impl : public subject<I, O>\r
{ \r
- template <typename, typename> friend class basic_subject;\r
+ template <typename, typename> friend class basic_subject_impl;\r
\r
- basic_subject(const basic_subject&);\r
- basic_subject& operator=(const basic_subject&);\r
+ basic_subject_impl(const basic_subject_impl&);\r
+ basic_subject_impl& operator=(const basic_subject_impl&);\r
public: \r
typedef typename subject<I, O>::observer observer;\r
typedef typename subject<I, O>::observer_ptr observer_ptr;\r
\r
- basic_subject()\r
+ basic_subject_impl()\r
{\r
}\r
\r
- virtual ~basic_subject()\r
+ virtual ~basic_subject_impl()\r
{\r
}\r
\r
- basic_subject(basic_subject<typename observer::value_type, typename observable::value_type>&& other)\r
+ basic_subject_impl(basic_subject_impl<typename observer::value_type, typename observable::value_type>&& other)\r
: observers_(std::move(other.observers_))\r
{\r
}\r
\r
- basic_subject& operator=(basic_subject<typename observer::value_type, typename observable::value_type>&& other)\r
+ basic_subject_impl& operator=(basic_subject_impl<typename observer::value_type, typename observable::value_type>&& other)\r
{\r
other.swap(*this);\r
return *this;\r
}\r
\r
- void swap(basic_subject<typename observer::value_type, typename observable::value_type>& other)\r
+ void swap(basic_subject_impl<typename observer::value_type, typename observable::value_type>& other)\r
{ \r
tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);\r
tbb::spin_rw_mutex::scoped_lock other_lock(other.mutex_, true);\r
mutable tbb::spin_rw_mutex mutex_;\r
};\r
\r
+template<typename I, typename O = I>\r
+class basic_subject : public subject<I, O>\r
+{ \r
+ template <typename, typename> friend class basic_subject;\r
+\r
+ basic_subject(const basic_subject&);\r
+ basic_subject& operator=(const basic_subject&);\r
+\r
+ typedef basic_subject_impl<I, O> impl;\r
+public: \r
+ typedef typename subject<I, O>::observer observer;\r
+ typedef typename subject<I, O>::observer_ptr observer_ptr;\r
+\r
+ basic_subject()\r
+ : impl_(std::make_shared<impl>())\r
+\r
+ {\r
+ }\r
+ \r
+ basic_subject(subject&& other)\r
+ : impl_(std::move(other.impl_))\r
+ {\r
+ }\r
+\r
+ virtual ~basic_subject()\r
+ {\r
+ }\r
+\r
+ basic_subject& operator=(basic_subject&& other)\r
+ {\r
+ other.swap(*this);\r
+ }\r
+\r
+ void swap(basic_subject& other)\r
+ {\r
+ impl_.swap(other.impl_);\r
+ }\r
+ \r
+ virtual void subscribe(const observer_ptr& o) override\r
+ { \r
+ impl_->subscribe(o);\r
+ }\r
+\r
+ virtual void unsubscribe(const observer_ptr& o) override\r
+ {\r
+ impl_->unsubscribe(o);\r
+ }\r
+ \r
+ virtual void on_next(const I& e) override\r
+ { \r
+ impl_->on_next(e);\r
+ }\r
+\r
+ operator std::weak_ptr<observer>()\r
+ {\r
+ return impl_;\r
+ }\r
+private:\r
+ std::shared_ptr<impl> impl_;\r
+};\r
+\r
template<typename F>\r
spl::shared_ptr<observer_function<typename std::decay<typename detail::function_traits<F>::arg1_type>::type, F>> \r
make_observer(F func)\r
{ \r
subject(const subject&);\r
subject& operator=(const subject&);\r
+\r
+ typedef reactive::basic_subject_impl<monitor::event> impl_base;\r
\r
- typedef reactive::basic_subject<monitor::event> impl;\r
+ class impl : public impl_base\r
+ {\r
+ public:\r
+ impl(monitor::path path = monitor::path())\r
+ : path_(std::move(path))\r
+ {\r
+ }\r
+ \r
+ virtual void on_next(const monitor::event& e) override\r
+ { \r
+ impl_base::on_next(path_.empty() ? e : e.propagate(path_));\r
+ }\r
+ private:\r
+ monitor::path path_;\r
+ };\r
+\r
public: \r
subject(monitor::path path = monitor::path())\r
- : path_(std::move(path))\r
- , impl_(std::make_shared<impl>())\r
+ : impl_(std::make_shared<impl>(std::move(path)))\r
\r
{\r
}\r
\r
subject(subject&& other)\r
- : path_(std::move(other.path_))\r
- , impl_(std::move(other.impl_))\r
+ : impl_(std::move(other.impl_))\r
{\r
}\r
\r
\r
void swap(subject& other)\r
{\r
- std::swap(path_, other.path_);\r
- std::swap(impl_, other.impl_);\r
+ impl_.swap(other.impl_);\r
}\r
\r
virtual void subscribe(const observer_ptr& o) override\r
\r
virtual void on_next(const monitor::event& e) override\r
{ \r
- impl_->on_next(path_.empty() ? e : e.propagate(path_));\r
+ impl_->on_next(e);\r
}\r
\r
operator std::weak_ptr<observer>()\r
return impl_;\r
}\r
private:\r
- monitor::path path_;\r
std::shared_ptr<impl> impl_;\r
};\r
\r