5 #include "oscpack/oscOutboundPacketStream.h"
7 #include <common/utility/string.h>
9 #include <boost/asio.hpp>
10 #include <boost/foreach.hpp>
11 #include <boost/thread.hpp>
13 #include <tbb/atomic.h>
14 #include <tbb/cache_aligned_allocator.h>
19 #include <unordered_map>
22 using namespace boost::asio::ip;
24 namespace caspar { namespace protocol { namespace osc {
33 static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
34 static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
38 typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
41 struct param_visitor : public boost::static_visitor<void>
50 void operator()(const bool value) {o << value;}
51 void operator()(const int32_t value) {o << static_cast<int64_t>(value);}
52 void operator()(const uint32_t value) {o << static_cast<int64_t>(value);}
53 void operator()(const int64_t value) {o << static_cast<int64_t>(value);}
54 void operator()(const uint64_t value) {o << static_cast<int64_t>(value);}
55 void operator()(const float value) {o << value;}
56 void operator()(const double value) {o << static_cast<float>(value);}
57 void operator()(const std::string& value) {o << value.c_str();}
58 void operator()(const std::wstring& value) {o << narrow(value).c_str();}
59 void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
62 struct size_visitor : public boost::static_visitor<std::size_t>
64 std::size_t operator()(const bool value) { return sizeof(bool); }
65 std::size_t operator()(const int32_t value) { return sizeof(int64_t); }
66 std::size_t operator()(const uint32_t value) { return sizeof(int64_t); }
67 std::size_t operator()(const int64_t value) { return sizeof(int64_t); }
68 std::size_t operator()(const uint64_t value) { return sizeof(int64_t); }
69 std::size_t operator()(const float value) { return sizeof(float); }
70 std::size_t operator()(const double value) { return sizeof(float); }
71 std::size_t operator()(const std::string& value) { return value.size(); }
72 std::size_t operator()(const std::wstring& value) { return value.size(); }
73 std::size_t operator()(const std::vector<int8_t>& value) { return value.size(); }
76 void write_osc_event(byte_vector& destination, const core::monitor::message& e)
80 std::size_t size = 256; // This should be enough to cover address, padding and meta-data.
82 size_visitor size_visitor;
83 BOOST_FOREACH(auto& data, e.data())
84 size += boost::apply_visitor(size_visitor, data);
86 destination.resize(size);
88 ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
90 o << ::osc::BeginMessage(e.path().c_str());
92 param_visitor<decltype(o)> param_visitor(o);
93 BOOST_FOREACH(auto& data, e.data())
94 boost::apply_visitor(param_visitor, data);
96 o << ::osc::EndMessage;
98 destination.resize(o.Size());
107 boost::asio::io_service service_;
109 udp::endpoint endpoint_;
112 std::unordered_map<std::string, byte_vector> updates_;
113 boost::mutex updates_mutex_;
115 boost::condition_variable cond_;
116 boost::mutex cond_mutex_;
118 tbb::atomic<bool> is_running_;
120 boost::thread thread_;
122 Concurrency::call<core::monitor::message> on_next_;
125 impl(udp::endpoint endpoint,
126 Concurrency::ISource<core::monitor::message>& source)
127 : endpoint_(endpoint)
128 , socket_(service_, endpoint_.protocol())
129 , thread_(boost::bind(&impl::run, this))
130 , on_next_(boost::bind(&impl::on_next, this, _1))
132 source.link_target(&on_next_);
144 void on_next(const core::monitor::message& msg)
147 boost::lock_guard<boost::mutex> lock(updates_mutex_);
148 write_osc_event(updates_[msg.path()], msg);
160 std::unordered_map<std::string, byte_vector> updates;
162 boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
166 cond_.wait(cond_lock);
169 boost::lock_guard<boost::mutex> lock(updates_mutex_);
170 std::swap(updates, updates_);
173 std::vector<boost::asio::const_buffers_1> buffers;
175 BOOST_FOREACH(const auto& slot, updates)
176 buffers.push_back(boost::asio::buffer(slot.second));
178 socket_.send_to(buffers, endpoint_);
185 CASPAR_LOG_CURRENT_EXCEPTION();
190 client::client(udp::endpoint endpoint,
191 Concurrency::ISource<core::monitor::message>& source)
192 : impl_(new impl(endpoint, source))
196 client::client(client&& other)
197 : impl_(std::move(other.impl_))
201 client& client::operator=(client&& other)
203 impl_ = std::move(other.impl_);