3 #include <common/reactive.h>
\r
5 #include <functional>
\r
9 #include <type_traits>
\r
12 #include <boost/variant.hpp>
\r
13 #include <boost/lexical_cast.hpp>
\r
14 #include <boost/chrono.hpp>
\r
16 #include <tbb/cache_aligned_allocator.h>
\r
19 namespace detail { namespace variant {
\r
22 struct has_nothrow_move<std::string>
\r
28 struct has_nothrow_move<std::vector<int8_t>>
\r
34 struct has_nothrow_move<boost::chrono::duration<double, boost::ratio<1, 1>>>
\r
41 namespace caspar { namespace monitor {
\r
54 path(const char* path);
\r
55 path(std::string path);
\r
57 path(const path& other);
\r
62 path& operator=(path other);
\r
63 path& operator%=(path other);
\r
65 template<typename T>
\r
66 typename std::enable_if<!std::is_same<typename std::decay<T>::type, path>::value, path&>::type operator%=(T&& value)
\r
68 auto str = boost::lexical_cast<std::string>(std::forward<T>(value));
\r
71 str_ += (str.front() != '/' ? "/" : "") + std::move(str);
\r
76 path& operator%=(const char* value)
\r
78 return *this %= std::string(value);
\r
81 void swap(path& other);
\r
85 const std::string& str() const;
\r
91 template<typename T>
\r
92 path operator%(path path, T&& value)
\r
94 return std::move(path %= std::forward<T>(value));
\r
97 std::ostream& operator<<(std::ostream& o, const path& p);
\r
101 typedef boost::chrono::duration<double, boost::ratio<1, 1>> duration;
\r
103 typedef boost::variant<bool, int32_t, int64_t, float, double, std::string, std::wstring, std::vector<int8_t>, duration> param;
\r
105 std::ostream& operator<<(std::ostream& o, const param& p);
\r
115 typedef std::vector<param, tbb::cache_aligned_allocator<param>> params_t;
\r
120 event(path path, params_t params);
\r
121 event(const event& other);
\r
122 event(event&& other);
\r
126 event& operator=(event other);
\r
128 void swap(event& other);
\r
130 template<typename T>
\r
131 event& operator%(T&& value)
\r
133 params_.push_back(std::forward<T>(value));
\r
137 event propagate(path path) const;
\r
141 const path& path() const;
\r
142 const params_t& params() const;
\r
144 monitor::path path_;
\r
148 std::ostream& operator<<(std::ostream& o, const event& e);
\r
152 typedef reactive::observable<monitor::event> observable;
\r
153 typedef reactive::observer<monitor::event> observer;
\r
154 typedef reactive::subject<monitor::event> subject;
\r
156 class basic_subject sealed : public reactive::subject<monitor::event>
\r
158 basic_subject(const basic_subject&);
\r
159 basic_subject& operator=(const basic_subject&);
\r
161 class impl : public observer
\r
164 impl(monitor::path path = monitor::path())
\r
166 , path_(std::move(path))
\r
171 : impl_(std::move(other.impl_))
\r
172 , path_(std::move(other.path_))
\r
176 impl& operator=(impl&& other)
\r
178 impl_ = std::move(other.impl_);
\r
179 path_ = std::move(other.path_);
\r
182 void on_next(const monitor::event& e) override
\r
184 impl_.on_next(path_.empty() ? e : e.propagate(path_));
\r
187 void subscribe(const observer_ptr& o)
\r
189 impl_.subscribe(o);
\r
192 void unsubscribe(const observer_ptr& o)
\r
194 impl_.unsubscribe(o);
\r
198 reactive::basic_subject_impl<monitor::event> impl_;
\r
199 monitor::path path_;
\r
208 basic_subject(monitor::path path = monitor::path())
\r
209 : impl_(std::make_shared<impl>(std::move(path)))
\r
214 basic_subject(basic_subject&& other)
\r
215 : impl_(std::move(other.impl_))
\r
221 basic_subject& operator=(basic_subject&& other)
\r
223 impl_ = std::move(other.impl_);
\r
226 operator std::weak_ptr<observer>()
\r
233 void subscribe(const observer_ptr& o) override
\r
235 impl_->subscribe(o);
\r
238 void unsubscribe(const observer_ptr& o) override
\r
240 impl_->unsubscribe(o);
\r
245 void on_next(const monitor::event& e) override
\r
253 std::shared_ptr<impl> impl_;
\r
256 inline subject& operator<<(subject& s, const event& e)
\r