-#pragma once\r
-\r
-#include <common/reactive.h>\r
-#include <common/memory.h>\r
-\r
-#include <functional>\r
-#include <string>\r
-#include <vector>\r
-#include <ostream>\r
-#include <type_traits>\r
-\r
-#include <boost/variant.hpp>\r
-#include <boost/lexical_cast.hpp>\r
-#include <boost/chrono.hpp>\r
-\r
-#include <tbb/cache_aligned_allocator.h>\r
-#include <tbb/spin_mutex.h>\r
-\r
-#ifndef _SCL_SECURE_NO_WARNINGS\r
-#define _SCL_SECURE_NO_WARNINGS\r
-#endif\r
-\r
-namespace boost {\r
-namespace detail { namespace variant {\r
-\r
-template <>\r
-struct has_nothrow_move<std::string>\r
-: mpl::true_\r
-{\r
-};\r
- \r
-template <>\r
-struct has_nothrow_move<std::vector<int8_t>>\r
-: mpl::true_\r
-{\r
-};\r
-\r
-template <>\r
-struct has_nothrow_move<boost::chrono::duration<double, boost::ratio<1, 1>>>\r
-: mpl::true_\r
-{\r
-};\r
-\r
-}}}\r
-\r
-namespace caspar { namespace monitor {\r
- \r
-// path\r
-\r
-class path sealed\r
-{\r
-public: \r
-\r
- // Static Members\r
-\r
- // Constructors\r
-\r
- path(); \r
- path(const char* path);\r
- path(std::string path);\r
-\r
- path(const path& other); \r
- path(path&& other);\r
- \r
- // Methods\r
-\r
- path& operator=(path other);\r
- path& operator%=(path other);\r
-\r
- template<typename T>\r
- typename std::enable_if<!std::is_same<typename std::decay<T>::type, path>::value, path&>::type operator%=(T&& value)\r
- {\r
- auto str = boost::lexical_cast<std::string>(std::forward<T>(value));\r
-\r
- if(!str.empty())\r
- str_ += (str.front() != '/' ? "/" : "") + std::move(str);\r
-\r
- return *this;\r
- }\r
- \r
- path& operator%=(const char* value)\r
- {\r
- return *this %= std::string(value);\r
- }\r
-\r
- void swap(path& other);\r
-\r
- // Properties\r
-\r
- const std::string& str() const; \r
- bool empty() const;\r
-private:\r
- std::string str_;\r
-};\r
-\r
-template<typename T>\r
-path operator%(path path, T&& value)\r
-{ \r
- return std::move(path %= std::forward<T>(value));\r
-}\r
-\r
-std::ostream& operator<<(std::ostream& o, const path& p);\r
-\r
-// param\r
-\r
-typedef boost::chrono::duration<double, boost::ratio<1, 1>> duration;\r
-\r
-typedef boost::variant<bool, int32_t, int64_t, float, double, std::string, std::wstring, std::vector<int8_t>, duration> param;\r
-\r
-std::ostream& operator<<(std::ostream& o, const param& p);\r
-\r
-// event\r
-\r
-class event sealed\r
-{ \r
-public: \r
- \r
- // Static Members\r
-\r
- typedef std::vector<param, tbb::cache_aligned_allocator<param>> params_t;\r
-\r
- // Constructors\r
-\r
- event(path path); \r
- event(path path, params_t params); \r
- event(const event& other);\r
- event(event&& other);\r
-\r
- // Methods\r
-\r
- event& operator=(event other);\r
-\r
- void swap(event& other);\r
- \r
- template<typename T>\r
- event& operator%(T&& value)\r
- {\r
- params_.push_back(std::forward<T>(value));\r
- return *this;\r
- }\r
- \r
- event propagate(path path) const;\r
-\r
- // Properties\r
-\r
- const path& path() const;\r
- const params_t& params() const;\r
-private:\r
- monitor::path path_;\r
- params_t params_;\r
-};\r
-\r
-std::ostream& operator<<(std::ostream& o, const event& e);\r
-\r
-// reactive\r
-\r
-typedef reactive::observable<monitor::event> observable;\r
-typedef reactive::observer<monitor::event> observer;\r
-typedef reactive::subject<monitor::event> subject;\r
- \r
-class basic_subject sealed : public reactive::subject<monitor::event>\r
-{ \r
- basic_subject(const basic_subject&);\r
- basic_subject& operator=(const basic_subject&);\r
- \r
- class impl : public observer\r
- {\r
- public:\r
- impl(monitor::path path = monitor::path())\r
- : impl_()\r
- , path_(std::move(path))\r
- {\r
- }\r
-\r
- impl(impl&& other)\r
- : impl_(std::move(other.impl_))\r
- , path_(std::move(other.path_))\r
- { \r
- }\r
-\r
- impl& operator=(impl&& other)\r
- {\r
- impl_ = std::move(other.impl_); \r
- path_ = std::move(other.path_);\r
- }\r
- \r
- void on_next(const monitor::event& e) override\r
- { \r
- impl_.on_next(path_.empty() ? e : e.propagate(path_));\r
- }\r
-\r
- void subscribe(const observer_ptr& o)\r
- { \r
- impl_.subscribe(o);\r
- }\r
-\r
- void unsubscribe(const observer_ptr& o)\r
- {\r
- impl_.unsubscribe(o);\r
- }\r
- \r
- private:\r
- reactive::basic_subject_impl<monitor::event> impl_; \r
- monitor::path path_;\r
- };\r
-\r
-public: \r
-\r
- // Static Members\r
-\r
- // Constructors\r
-\r
- basic_subject(monitor::path path = monitor::path())\r
- : impl_(std::make_shared<impl>(std::move(path)))\r
-\r
- {\r
- }\r
- \r
- basic_subject(basic_subject&& other)\r
- : impl_(std::move(other.impl_))\r
- {\r
- }\r
- \r
- // Methods\r
-\r
- basic_subject& operator=(basic_subject&& other)\r
- {\r
- impl_ = std::move(other.impl_);\r
- }\r
-\r
- operator std::weak_ptr<observer>()\r
- {\r
- return impl_;\r
- }\r
-\r
- // observable\r
- \r
- virtual void subscribe(const observer_ptr& o) override\r
- { \r
- impl_->subscribe(o);\r
- }\r
-\r
- virtual void unsubscribe(const observer_ptr& o) override\r
- {\r
- impl_->unsubscribe(o);\r
- }\r
-\r
- // observer\r
- \r
- virtual void on_next(const monitor::event& e) override\r
- { \r
- impl_->on_next(e);\r
- }\r
-\r
- // Properties\r
-\r
-private:\r
- std::shared_ptr<impl> impl_;\r
-};\r
-\r
-inline subject& operator<<(subject& s, const event& e)\r
-{\r
- s.on_next(e);\r
- return s;\r
-}\r
-\r
-}}
\ No newline at end of file
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+#pragma once
+
+#include <common/memory.h>
+#include <common/assert.h>
+
+#include <boost/variant.hpp>
+#include <boost/chrono/duration.hpp>
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+namespace caspar { namespace core { namespace monitor {
+
+typedef boost::variant<bool,
+ std::int32_t,
+ std::int64_t,
+ float,
+ double,
+ std::string,
+ std::wstring,
+ std::vector<std::int8_t>> data_t;
+
+class message
+{
+public:
+
+ message(std::string path, std::vector<data_t> data = std::vector<data_t>())
+ : path_(std::move(path))
+ , data_ptr_(std::make_shared<std::vector<data_t>>(std::move(data)))
+ {
+ CASPAR_ASSERT(path.empty() || path[0] == '/');
+ }
+
+ message(std::string path, spl::shared_ptr<std::vector<data_t>> data_ptr)
+ : path_(std::move(path))
+ , data_ptr_(std::move(data_ptr))
+ {
+ CASPAR_ASSERT(path.empty() || path[0] == '/');
+ }
+
+ const std::string& path() const
+ {
+ return path_;
+ }
+
+ const std::vector<data_t>& data() const
+ {
+ return *data_ptr_;
+ }
+
+ message propagate(const std::string& path) const
+ {
+ return message(path + path_, data_ptr_);
+ }
+
+ template<typename T>
+ message& operator%(T&& data)
+ {
+ data_ptr_->push_back(std::forward<T>(data));
+ return *this;
+ }
+
+private:
+ std::string path_;
+ spl::shared_ptr<std::vector<data_t>> data_ptr_;
+};
+
+struct sink
+{
+ virtual ~sink() { }
+
+ virtual void propagate(const message& msg) = 0;
+};
+
+class subject : public sink
+{
+private:
+ std::weak_ptr<sink> parent_;
+ const std::string path_;
+public:
+ subject(std::string path = "")
+ : path_(std::move(path))
+ {
+ CASPAR_ASSERT(path.empty() || path[0] == '/');
+ }
+
+ void attach_parent(spl::shared_ptr<sink> parent)
+ {
+ parent_ = std::move(parent);
+ }
+
+ void detach_parent()
+ {
+ parent_.reset();
+ }
+
+ subject& operator<<(const message& msg)
+ {
+ propagate(msg);
+
+ return *this;
+ }
+
+ virtual void propagate(const message& msg) override
+ {
+ auto parent = parent_.lock();
+
+ if (parent)
+ parent->propagate(msg.propagate(path_));
+ }
+};
+
+}}}