X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=protocol%2Fosc%2Fclient.cpp;h=6c99cf4ada55b7a6dcf2d3e4f1d6b72844d3a996;hb=c123aae818083df7bbbf22002eef71b1bc727b14;hp=5ea6d9a18827493adc3b67c14c183c89cb879ee0;hpb=b0a6986ce56a18a56e67be266d6d253af7cdcbb5;p=casparcg diff --git a/protocol/osc/client.cpp b/protocol/osc/client.cpp index 5ea6d9a18..6c99cf4ad 100644 --- a/protocol/osc/client.cpp +++ b/protocol/osc/client.cpp @@ -20,7 +20,7 @@ * Author: Helge Norberg, helge.norberg@svt.se */ -#include "../stdafx.h" +#include "../StdAfx.h" #include "client.h" @@ -30,6 +30,9 @@ #include #include #include +#include +#include +#include #include @@ -38,30 +41,16 @@ #include #include -#include #include #include #include -#include using namespace boost::asio::ip; namespace caspar { namespace protocol { namespace osc { -template -struct no_init_proxy -{ - T value; - - no_init_proxy() - { - static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size"); - static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment"); - } -}; - -typedef std::vector, tbb::cache_aligned_allocator>> byte_vector; +typedef cache_aligned_vector> byte_vector; template struct param_visitor : public boost::static_visitor @@ -85,19 +74,34 @@ struct param_visitor : public boost::static_visitor void operator()(const std::vector& value) {o << ::osc::Blob(value.data(), static_cast(value.size()));} }; -void write_osc_event(byte_vector& destination, const core::monitor::message& e) -{ - destination.resize(4096); +void write_osc_event(byte_vector& destination, const core::monitor::message& message, int retry_allocation_attempt = 0) +{ + static std::size_t max_size = 128; + + destination.resize(max_size); ::osc::OutboundPacketStream o(reinterpret_cast(destination.data()), static_cast(destination.size())); - o << ::osc::BeginMessage(e.path().c_str()); - - param_visitor param_visitor(o); - BOOST_FOREACH(const auto& data, e.data()) - boost::apply_visitor(param_visitor, data); - - o << ::osc::EndMessage; - + + try + { + o << ::osc::BeginMessage(message.path().c_str()); + + param_visitor param_visitor(o); + for (const auto& data : message.data()) + boost::apply_visitor(param_visitor, data); + + o << ::osc::EndMessage; + } + catch (const ::osc::OutOfBufferMemoryException& e) + { + if (retry_allocation_attempt > message.data().size()) + throw; + + max_size = e.required; + CASPAR_LOG(trace) << L"[osc] Too small buffer for osc message. Increasing to " << max_size; + return write_osc_event(destination, message, retry_allocation_attempt + 1); + } + destination.resize(o.Size()); } @@ -129,7 +133,8 @@ void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& struct client::impl : public spl::enable_shared_from_this, core::monitor::sink { - udp::socket socket_; + std::shared_ptr service_; + udp::socket socket_; tbb::spin_mutex endpoints_mutex_; std::map reference_counts_by_endpoint_; @@ -142,8 +147,9 @@ struct client::impl : public spl::enable_shared_from_this, core::m boost::thread thread_; public: - impl(boost::asio::io_service& service) - : socket_(service, udp::v4()) + impl(std::shared_ptr service) + : service_(std::move(service)) + , socket_(*service_, udp::v4()) , thread_(boost::bind(&impl::run, this)) { } @@ -208,7 +214,7 @@ private: { boost::system::error_code ec; - BOOST_FOREACH(const auto& endpoint, destinations) + for (const auto& endpoint : destinations) socket_.send_to(buffers, endpoint, 0, ec); } @@ -217,6 +223,8 @@ private: // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size const int SAFE_DATAGRAM_SIZE = 508; + ensure_gpf_handler_installed_for_thread("osc-sender-thread"); + try { is_running_ = true; @@ -234,6 +242,9 @@ private: { boost::unique_lock cond_lock(updates_mutex_); + if (!is_running_) + return; + if (updates_.empty()) updates_cond_.wait(cond_lock); @@ -243,7 +254,7 @@ private: { tbb::spin_mutex::scoped_lock lock(endpoints_mutex_); - BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_) + for (const auto& endpoint : reference_counts_by_endpoint_) destinations.push_back(endpoint.first); } @@ -258,7 +269,7 @@ private: auto datagram_size = bundle_header.size(); buffers.push_back(boost::asio::buffer(bundle_header)); - BOOST_FOREACH(const auto& slot, updates) + for (const auto& slot : updates) { write_osc_bundle_element_start(element_headers[i], slot.second); const auto& headers = element_headers; @@ -291,8 +302,8 @@ private: } }; -client::client(boost::asio::io_service& service) - : impl_(new impl(service)) +client::client(std::shared_ptr service) + : impl_(new impl(std::move(service))) { }