2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
20 * Author: Helge Norberg, helge.norberg@svt.se
23 #include "../StdAfx.h"
27 #include "oscpack/OscOutboundPacketStream.h"
28 #include "oscpack/OscHostEndianness.h"
30 #include <common/utf.h>
31 #include <common/except.h>
32 #include <common/endian.h>
33 #include <common/cache_aligned_vector.h>
34 #include <common/os/general_protection_fault.h>
36 #include <core/monitor/monitor.h>
40 #include <unordered_map>
42 #include <boost/asio.hpp>
43 #include <boost/bind.hpp>
44 #include <boost/thread.hpp>
46 #include <tbb/spin_mutex.h>
48 using namespace boost::asio::ip;
50 namespace caspar { namespace protocol { namespace osc {
59 static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
60 static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
64 typedef cache_aligned_vector<no_init_proxy<char>> byte_vector;
67 struct param_visitor : public boost::static_visitor<void>
76 void operator()(const bool value) {o << value;}
77 void operator()(const int32_t value) {o << static_cast<int64_t>(value);}
78 void operator()(const uint32_t value) {o << static_cast<int64_t>(value);}
79 void operator()(const int64_t value) {o << static_cast<int64_t>(value);}
80 void operator()(const uint64_t value) {o << static_cast<int64_t>(value);}
81 void operator()(const float value) {o << value;}
82 void operator()(const double value) {o << static_cast<float>(value);}
83 void operator()(const std::string& value) {o << value.c_str();}
84 void operator()(const std::wstring& value) {o << u8(value).c_str();}
85 void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
88 void write_osc_event(byte_vector& destination, const core::monitor::message& message, int retry_allocation_attempt = 0)
90 static std::size_t max_size = 128;
92 destination.resize(max_size);
94 ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
98 o << ::osc::BeginMessage(message.path().c_str());
100 param_visitor<decltype(o)> param_visitor(o);
101 for (const auto& data : message.data())
102 boost::apply_visitor(param_visitor, data);
104 o << ::osc::EndMessage;
106 catch (const ::osc::OutOfBufferMemoryException& e)
108 if (retry_allocation_attempt > message.data().size())
111 max_size = e.required;
112 CASPAR_LOG(trace) << L"[osc] Too small buffer for osc message. Increasing to " << max_size;
113 return write_osc_event(destination, message, retry_allocation_attempt + 1);
116 destination.resize(o.Size());
119 byte_vector write_osc_bundle_start()
121 byte_vector destination;
122 destination.resize(16);
124 ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
125 o << ::osc::BeginBundle();
127 destination.resize(o.Size());
132 void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
134 destination.resize(4);
136 int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
138 #ifdef OSC_HOST_LITTLE_ENDIAN
139 *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
141 *bundle_element_size = static_cast<int32_t>(bundle.size());
145 struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
147 std::shared_ptr<boost::asio::io_service> service_;
149 tbb::spin_mutex endpoints_mutex_;
150 std::map<udp::endpoint, int> reference_counts_by_endpoint_;
152 std::unordered_map<std::string, byte_vector> updates_;
153 boost::mutex updates_mutex_;
154 boost::condition_variable updates_cond_;
156 tbb::atomic<bool> is_running_;
158 boost::thread thread_;
161 impl(std::shared_ptr<boost::asio::io_service> service)
162 : service_(std::move(service))
163 , socket_(*service_, udp::v4())
164 , thread_(boost::bind(&impl::run, this))
172 updates_cond_.notify_one();
177 std::shared_ptr<void> get_subscription_token(
178 const boost::asio::ip::udp::endpoint& endpoint)
180 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
182 ++reference_counts_by_endpoint_[endpoint];
184 std::weak_ptr<impl> weak_self = shared_from_this();
186 return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
188 auto strong = weak_self.lock();
193 auto& self = *strong;
195 tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
197 int reference_count_after =
198 --self.reference_counts_by_endpoint_[endpoint];
200 if (reference_count_after == 0)
201 self.reference_counts_by_endpoint_.erase(endpoint);
205 void propagate(const core::monitor::message& msg)
207 boost::lock_guard<boost::mutex> lock(updates_mutex_);
211 write_osc_event(updates_[msg.path()], msg);
215 CASPAR_LOG_CURRENT_EXCEPTION();
216 updates_.erase(msg.path());
219 updates_cond_.notify_one();
224 const T& buffers, const std::vector<udp::endpoint>& destinations)
226 boost::system::error_code ec;
228 for (const auto& endpoint : destinations)
229 socket_.send_to(buffers, endpoint, 0, ec);
234 // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
235 const int SAFE_DATAGRAM_SIZE = 508;
237 ensure_gpf_handler_installed_for_thread("osc-sender-thread");
243 std::unordered_map<std::string, byte_vector> updates;
244 std::vector<udp::endpoint> destinations;
245 const byte_vector bundle_header = write_osc_bundle_start();
246 std::vector<byte_vector> element_headers;
251 destinations.clear();
254 boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
259 if (updates_.empty())
260 updates_cond_.wait(cond_lock);
262 std::swap(updates, updates_);
266 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
268 for (const auto& endpoint : reference_counts_by_endpoint_)
269 destinations.push_back(endpoint.first);
272 if (destinations.empty())
275 std::vector<boost::asio::const_buffers_1> buffers;
276 element_headers.resize(
277 std::max(element_headers.size(), updates.size()));
280 auto datagram_size = bundle_header.size();
281 buffers.push_back(boost::asio::buffer(bundle_header));
283 for (const auto& slot : updates)
285 write_osc_bundle_element_start(element_headers[i], slot.second);
286 const auto& headers = element_headers;
288 auto size_of_element = headers[i].size() + slot.second.size();
290 if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
292 do_send(buffers, destinations);
294 buffers.push_back(boost::asio::buffer(bundle_header));
295 datagram_size = bundle_header.size();
298 buffers.push_back(boost::asio::buffer(headers[i]));
299 buffers.push_back(boost::asio::buffer(slot.second));
301 datagram_size += size_of_element;
305 if (!buffers.empty())
306 do_send(buffers, destinations);
311 CASPAR_LOG_CURRENT_EXCEPTION();
316 client::client(std::shared_ptr<boost::asio::io_service> service)
317 : impl_(new impl(std::move(service)))
321 client::client(client&& other)
322 : impl_(std::move(other.impl_))
326 client& client::operator=(client&& other)
328 impl_ = std::move(other.impl_);
336 std::shared_ptr<void> client::get_subscription_token(
337 const boost::asio::ip::udp::endpoint& endpoint)
339 return impl_->get_subscription_token(endpoint);
342 spl::shared_ptr<core::monitor::sink> client::sink()