+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
#pragma once
#include <common/memory.h>
#include <string>
#include <vector>
-#include <agents.h>
-
-namespace caspar { namespace monitor {
-
+namespace caspar { namespace core { namespace monitor {
+
typedef boost::variant<bool,
std::int32_t,
std::int64_t,
{
CASPAR_ASSERT(path.empty() || path[0] == '/');
}
-
+
message(std::string path, spl::shared_ptr<std::vector<data_t>> data_ptr)
: path_(std::move(path))
, data_ptr_(std::move(data_ptr))
spl::shared_ptr<std::vector<data_t>> data_ptr_;
};
-class subject : public Concurrency::transformer<monitor::message, monitor::message>
+struct sink
{
+ virtual ~sink() { }
+
+ virtual void propagate(const message& msg) = 0;
+};
+
+class subject : public sink
+{
+private:
+ std::weak_ptr<sink> parent_;
+ const std::string path_;
public:
subject(std::string path = "")
- : Concurrency::transformer<monitor::message, monitor::message>([=](const message& msg)
- {
- return msg.propagate(path);
- })
+ : path_(std::move(path))
{
CASPAR_ASSERT(path.empty() || path[0] == '/');
}
- template<typename T>
- subject& operator<<(T&& msg)
+ void attach_parent(spl::shared_ptr<sink> parent)
{
- Concurrency::send(*this, std::forward<T>(msg));
+ parent_ = std::move(parent);
+ }
+
+ void detach_parent()
+ {
+ parent_.reset();
+ }
+
+ subject& operator<<(const message& msg)
+ {
+ propagate(msg);
+
return *this;
}
-};
-typedef Concurrency::ISource<monitor::message> source;
+ virtual void propagate(const message& msg) override
+ {
+ auto parent = parent_.lock();
+ if (parent)
+ parent->propagate(msg.propagate(path_));
+ }
+};
-}}
\ No newline at end of file
+}}}