* Author: Helge Norberg, helge.norberg@svt.se
*/
-#include "../stdafx.h"
+#include "../StdAfx.h"
#include "client.h"
#include <common/utf.h>
#include <common/except.h>
#include <common/endian.h>
+#include <common/cache_aligned_vector.h>
+#include <common/os/general_protection_fault.h>
+#include <common/no_init_proxy.h>
#include <core/monitor/monitor.h>
#include <unordered_map>
#include <boost/asio.hpp>
-#include <boost/foreach.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <tbb/spin_mutex.h>
-#include <tbb/cache_aligned_allocator.h>
using namespace boost::asio::ip;
namespace caspar { namespace protocol { namespace osc {
-template<typename T>
-struct no_init_proxy
-{
- T value;
-
- no_init_proxy()
- {
- static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
- static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
- }
-};
-
-typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+typedef cache_aligned_vector<no_init_proxy<char>> byte_vector;
template<typename T>
struct param_visitor : public boost::static_visitor<void>
void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
};
-void write_osc_event(byte_vector& destination, const core::monitor::message& e)
-{
- destination.resize(4096);
+void write_osc_event(byte_vector& destination, const core::monitor::message& message, int retry_allocation_attempt = 0)
+{
+ static std::size_t max_size = 128;
+
+ destination.resize(max_size);
::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
- o << ::osc::BeginMessage(e.path().c_str());
-
- param_visitor<decltype(o)> param_visitor(o);
- BOOST_FOREACH(const auto& data, e.data())
- boost::apply_visitor(param_visitor, data);
-
- o << ::osc::EndMessage;
-
+
+ try
+ {
+ o << ::osc::BeginMessage(message.path().c_str());
+
+ param_visitor<decltype(o)> param_visitor(o);
+ for (const auto& data : message.data())
+ boost::apply_visitor(param_visitor, data);
+
+ o << ::osc::EndMessage;
+ }
+ catch (const ::osc::OutOfBufferMemoryException& e)
+ {
+ if (retry_allocation_attempt > message.data().size())
+ throw;
+
+ max_size = e.required;
+ CASPAR_LOG(trace) << L"[osc] Too small buffer for osc message. Increasing to " << max_size;
+ return write_osc_event(destination, message, retry_allocation_attempt + 1);
+ }
+
destination.resize(o.Size());
}
struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
{
- udp::socket socket_;
+ std::shared_ptr<boost::asio::io_service> service_;
+ udp::socket socket_;
tbb::spin_mutex endpoints_mutex_;
std::map<udp::endpoint, int> reference_counts_by_endpoint_;
boost::thread thread_;
public:
- impl(boost::asio::io_service& service)
- : socket_(service, udp::v4())
+ impl(std::shared_ptr<boost::asio::io_service> service)
+ : service_(std::move(service))
+ , socket_(*service_, udp::v4())
, thread_(boost::bind(&impl::run, this))
{
}
{
boost::system::error_code ec;
- BOOST_FOREACH(const auto& endpoint, destinations)
+ for (const auto& endpoint : destinations)
socket_.send_to(buffers, endpoint, 0, ec);
}
// http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
const int SAFE_DATAGRAM_SIZE = 508;
+ ensure_gpf_handler_installed_for_thread("osc-sender-thread");
+
try
{
is_running_ = true;
{
boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
+ if (!is_running_)
+ return;
+
if (updates_.empty())
updates_cond_.wait(cond_lock);
{
tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
- BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+ for (const auto& endpoint : reference_counts_by_endpoint_)
destinations.push_back(endpoint.first);
}
auto datagram_size = bundle_header.size();
buffers.push_back(boost::asio::buffer(bundle_header));
- BOOST_FOREACH(const auto& slot, updates)
+ for (const auto& slot : updates)
{
write_osc_bundle_element_start(element_headers[i], slot.second);
const auto& headers = element_headers;
}
};
-client::client(boost::asio::io_service& service)
- : impl_(new impl(service))
+client::client(std::shared_ptr<boost::asio::io_service> service)
+ : impl_(new impl(std::move(service)))
{
}