2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
22 #include "../stdafx.h"
26 #include "oscpack/oscOutboundPacketStream.h"
28 #include <common/utility/string.h>
33 #include <boost/asio.hpp>
34 #include <boost/foreach.hpp>
35 #include <boost/bind.hpp>
37 #include <tbb/spin_mutex.h>
39 using namespace boost::asio::ip;
41 namespace caspar { namespace protocol { namespace osc {
44 struct param_visitor : public boost::static_visitor<void>
53 void operator()(const bool value) {o << value;}
54 void operator()(const int32_t value) {o << static_cast<int64_t>(value);}
55 void operator()(const uint32_t value) {o << static_cast<int64_t>(value);}
56 void operator()(const int64_t value) {o << static_cast<int64_t>(value);}
57 void operator()(const uint64_t value) {o << static_cast<int64_t>(value);}
58 void operator()(const float value) {o << value;}
59 void operator()(const double value) {o << static_cast<float>(value);}
60 void operator()(const std::string& value) {o << value.c_str();}
61 void operator()(const std::wstring& value) {o << narrow(value).c_str();}
62 void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
65 std::vector<char> write_osc_event(const core::monitor::message& e)
67 std::array<char, 4096> buffer;
68 ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
70 o << ::osc::BeginMessage(e.path().c_str());
72 param_visitor<decltype(o)> pd_visitor(o);
73 BOOST_FOREACH(auto data, e.data())
74 boost::apply_visitor(pd_visitor, data);
76 o << ::osc::EndMessage;
78 return std::vector<char>(o.Data(), o.Data() + o.Size());
81 struct client::impl : public std::enable_shared_from_this<client::impl>
83 tbb::spin_mutex endpoints_mutex_;
84 std::map<udp::endpoint, int> reference_counts_by_endpoint_;
87 Concurrency::call<core::monitor::message> on_next_;
91 boost::asio::io_service& service,
92 Concurrency::ISource<core::monitor::message>& source)
93 : socket_(service, udp::v4())
94 , on_next_([this](const core::monitor::message& msg) { on_next(msg); })
96 source.link_target(&on_next_);
99 std::shared_ptr<void> get_subscription_token(
100 const boost::asio::ip::udp::endpoint& endpoint)
102 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
104 ++reference_counts_by_endpoint_[endpoint];
106 std::weak_ptr<impl> weak_self = shared_from_this();
108 return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
110 auto strong = weak_self.lock();
115 auto& self = *strong;
117 tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
119 int reference_count_after =
120 --self.reference_counts_by_endpoint_[endpoint];
122 if (reference_count_after == 0)
123 self.reference_counts_by_endpoint_.erase(endpoint);
127 void on_next(const core::monitor::message& msg)
129 auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
131 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
133 BOOST_FOREACH(auto& elem, reference_counts_by_endpoint_)
135 auto& endpoint = elem.first;
137 // TODO: We seem to be lucky here, because according to asio
138 // documentation only one async operation can be "in flight"
139 // at any given point in time for a socket. This somehow seems
140 // to work though in the case of UDP and Windows.
141 socket_.async_send_to(
142 boost::asio::buffer(*data_ptr),
145 &impl::handle_send_to,
147 data_ptr, // The data_ptr needs to live
148 boost::asio::placeholders::error,
149 boost::asio::placeholders::bytes_transferred));
154 const safe_ptr<std::vector<char>>& /* sent_buffer */,
155 const boost::system::error_code& /*error*/,
156 size_t /*bytes_sent*/)
162 boost::asio::io_service& service,
163 Concurrency::ISource<core::monitor::message>& source)
164 : impl_(new impl(service, source))
168 client::client(client&& other)
169 : impl_(std::move(other.impl_))
173 client& client::operator=(client&& other)
175 impl_ = std::move(other.impl_);
183 std::shared_ptr<void> client::get_subscription_token(
184 const boost::asio::ip::udp::endpoint& endpoint)
186 return impl_->get_subscription_token(endpoint);