3 #include <common/reactive.h>
\r
4 #include <common/memory.h>
\r
6 #include <functional>
\r
10 #include <type_traits>
\r
12 #include <boost/variant.hpp>
\r
13 #include <boost/lexical_cast.hpp>
\r
14 #include <boost/chrono.hpp>
\r
16 #include <tbb/cache_aligned_allocator.h>
\r
17 #include <tbb/spin_mutex.h>
\r
19 #ifndef _SCL_SECURE_NO_WARNINGS
\r
20 #define _SCL_SECURE_NO_WARNINGS
\r
24 namespace detail { namespace variant {
\r
27 struct has_nothrow_move<std::string>
\r
33 struct has_nothrow_move<std::vector<int8_t>>
\r
39 struct has_nothrow_move<boost::chrono::duration<double, boost::ratio<1, 1>>>
\r
46 namespace caspar { namespace monitor {
\r
59 path(const char* path);
\r
60 path(std::string path);
\r
62 path(const path& other);
\r
67 path& operator=(path other);
\r
68 path& operator%=(path other);
\r
70 template<typename T>
\r
71 typename std::enable_if<!std::is_same<typename std::decay<T>::type, path>::value, path&>::type operator%=(T&& value)
\r
73 auto str = boost::lexical_cast<std::string>(std::forward<T>(value));
\r
76 str_ += (str.front() != '/' ? "/" : "") + std::move(str);
\r
81 path& operator%=(const char* value)
\r
83 return *this %= std::string(value);
\r
86 void swap(path& other);
\r
90 const std::string& str() const;
\r
96 template<typename T>
\r
97 path operator%(path path, T&& value)
\r
99 return std::move(path %= std::forward<T>(value));
\r
102 std::ostream& operator<<(std::ostream& o, const path& p);
\r
106 typedef boost::chrono::duration<double, boost::ratio<1, 1>> duration;
\r
108 typedef boost::variant<bool, int32_t, int64_t, float, double, std::string, std::wstring, std::vector<int8_t>, duration> param;
\r
110 std::ostream& operator<<(std::ostream& o, const param& p);
\r
120 typedef std::vector<param, tbb::cache_aligned_allocator<param>> params_t;
\r
125 event(path path, params_t params);
\r
126 event(const event& other);
\r
127 event(event&& other);
\r
131 event& operator=(event other);
\r
133 void swap(event& other);
\r
135 template<typename T>
\r
136 event& operator%(T&& value)
\r
138 params_.push_back(std::forward<T>(value));
\r
142 event propagate(path path) const;
\r
146 const path& path() const;
\r
147 const params_t& params() const;
\r
149 monitor::path path_;
\r
153 std::ostream& operator<<(std::ostream& o, const event& e);
\r
157 typedef reactive::observable<monitor::event> observable;
\r
158 typedef reactive::observer<monitor::event> observer;
\r
159 typedef reactive::subject<monitor::event> subject;
\r
161 class basic_subject sealed : public reactive::subject<monitor::event>
\r
163 basic_subject(const basic_subject&);
\r
164 basic_subject& operator=(const basic_subject&);
\r
166 class impl : public observer
\r
169 impl(monitor::path path = monitor::path())
\r
171 , path_(std::move(path))
\r
176 : impl_(std::move(other.impl_))
\r
177 , path_(std::move(other.path_))
\r
181 impl& operator=(impl&& other)
\r
183 impl_ = std::move(other.impl_);
\r
184 path_ = std::move(other.path_);
\r
187 void on_next(const monitor::event& e) override
\r
189 impl_.on_next(path_.empty() ? e : e.propagate(path_));
\r
192 void subscribe(const observer_ptr& o)
\r
194 impl_.subscribe(o);
\r
197 void unsubscribe(const observer_ptr& o)
\r
199 impl_.unsubscribe(o);
\r
203 reactive::basic_subject_impl<monitor::event> impl_;
\r
204 monitor::path path_;
\r
213 basic_subject(monitor::path path = monitor::path())
\r
214 : impl_(std::make_shared<impl>(std::move(path)))
\r
219 basic_subject(basic_subject&& other)
\r
220 : impl_(std::move(other.impl_))
\r
226 basic_subject& operator=(basic_subject&& other)
\r
228 impl_ = std::move(other.impl_);
\r
231 operator std::weak_ptr<observer>()
\r
238 void subscribe(const observer_ptr& o) override
\r
240 impl_->subscribe(o);
\r
243 void unsubscribe(const observer_ptr& o) override
\r
245 impl_->unsubscribe(o);
\r
250 void on_next(const monitor::event& e) override
\r
258 std::shared_ptr<impl> impl_;
\r
261 inline subject& operator<<(subject& s, const event& e)
\r