3 #include <common/reactive.h>
12 #include <boost/variant.hpp>
13 #include <boost/lexical_cast.hpp>
14 #include <boost/chrono.hpp>
16 #include <tbb/cache_aligned_allocator.h>
19 namespace detail { namespace variant {
22 struct has_nothrow_move<std::string>
28 struct has_nothrow_move<std::vector<int8_t>>
34 struct has_nothrow_move<boost::chrono::duration<double, boost::ratio<1, 1>>>
41 namespace caspar { namespace monitor {
54 path(const char* path);
55 path(std::string path);
57 path(const path& other);
62 path& operator=(path other);
63 path& operator%=(path other);
66 typename std::enable_if<!std::is_same<typename std::decay<T>::type, path>::value, path&>::type operator%=(T&& value)
68 auto str = boost::lexical_cast<std::string>(std::forward<T>(value));
71 str_ += (str.front() != '/' ? "/" : "") + std::move(str);
76 path& operator%=(const char* value)
78 return *this %= std::string(value);
81 void swap(path& other);
85 const std::string& str() const;
92 path operator%(path path, T&& value)
94 return std::move(path %= std::forward<T>(value));
97 std::ostream& operator<<(std::ostream& o, const path& p);
101 typedef boost::chrono::duration<double, boost::ratio<1, 1>> duration;
103 typedef boost::variant<bool, int32_t, int64_t, float, double, std::string, std::wstring, std::vector<int8_t>, duration> param;
105 std::ostream& operator<<(std::ostream& o, const param& p);
115 typedef std::vector<param, tbb::cache_aligned_allocator<param>> params_t;
120 event(path path, params_t params);
121 event(const event& other);
122 event(event&& other);
126 event& operator=(event other);
128 void swap(event& other);
131 event& operator%(T&& value)
133 params_.push_back(std::forward<T>(value));
137 event propagate(path path) const;
141 const path& path() const;
142 const params_t& params() const;
148 std::ostream& operator<<(std::ostream& o, const event& e);
152 typedef reactive::observable<monitor::event> observable;
153 typedef reactive::observer<monitor::event> observer;
154 typedef reactive::subject<monitor::event> subject;
156 class basic_subject sealed : public reactive::subject<monitor::event>
158 basic_subject(const basic_subject&);
159 basic_subject& operator=(const basic_subject&);
161 class impl : public observer
164 impl(monitor::path path = monitor::path())
166 , path_(std::move(path))
171 : impl_(std::move(other.impl_))
172 , path_(std::move(other.path_))
176 impl& operator=(impl&& other)
178 impl_ = std::move(other.impl_);
179 path_ = std::move(other.path_);
182 void on_next(const monitor::event& e) override
184 impl_.on_next(path_.empty() ? e : e.propagate(path_));
187 void subscribe(const observer_ptr& o)
192 void unsubscribe(const observer_ptr& o)
194 impl_.unsubscribe(o);
198 reactive::basic_subject_impl<monitor::event> impl_;
208 basic_subject(monitor::path path = monitor::path())
209 : impl_(std::make_shared<impl>(std::move(path)))
214 basic_subject(basic_subject&& other)
215 : impl_(std::move(other.impl_))
221 basic_subject& operator=(basic_subject&& other)
223 impl_ = std::move(other.impl_);
226 operator std::weak_ptr<observer>()
233 void subscribe(const observer_ptr& o) override
238 void unsubscribe(const observer_ptr& o) override
240 impl_->unsubscribe(o);
245 void on_next(const monitor::event& e) override
253 std::shared_ptr<impl> impl_;
256 inline subject& operator<<(subject& s, const event& e)