#include <common/utility/string.h>
-#include <functional>
-#include <vector>
-
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
#include <boost/thread.hpp>
+#include <boost/timer.hpp>
+
+#include <tbb/concurrent_queue.h>
+#include <tbb/concurrent_unordered_map.h>
+#include <tbb/concurrent_unordered_set.h>
+
+#include <array>
+#include <functional>
+#include <memory>
+#include <unordered_map>
+#include <vector>
using namespace boost::asio::ip;
namespace caspar { namespace protocol { namespace osc {
+template<typename T>
+struct no_init_proxy
+{
+ T value;
+
+ no_init_proxy()
+ {
+ static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
+ static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
+ }
+};
+
+typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+
template<typename T>
struct param_visitor : public boost::static_visitor<void>
{
void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
};
-std::vector<char> write_osc_event(const core::monitor::message& e)
+struct size_visitor : public boost::static_visitor<std::size_t>
+{
+ std::size_t operator()(const std::string& value) { return value.size() * sizeof(value.front()) * 2; }
+ std::size_t operator()(const std::wstring& value) { return value.size() * sizeof(value.front()) * 2; }
+ std::size_t operator()(const std::vector<int8_t>& value) { return value.size() * sizeof(value.front()) * 2; }
+
+ template<typename T>
+ std::size_t operator()(const T&)
+ {
+ return 64;
+ }
+};
+
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
{
- std::array<char, 4096> buffer;
- ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
+ try
+ {
+ std::size_t size = 0;
+
+ size_visitor size_visitor;
+ BOOST_FOREACH(auto& data, e.data())
+ size += boost::apply_visitor(size_visitor, data);
- o << ::osc::BeginMessage(e.path().c_str());
+ destination.resize(size);
+
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+
+ o << ::osc::BeginMessage(e.path().c_str());
- param_visitor<decltype(o)> pd_visitor(o);
- BOOST_FOREACH(auto data, e.data())
- boost::apply_visitor(pd_visitor, data);
+ param_visitor<decltype(o)> param_visitor(o);
+ BOOST_FOREACH(auto& data, e.data())
+ boost::apply_visitor(param_visitor, data);
- o << ::osc::EndMessage;
+ o << ::osc::EndMessage;
- return std::vector<char>(o.Data(), o.Data() + o.Size());
+ destination.resize(o.Size());
+ }
+ catch(...)
+ {
+ }
}
struct client::impl
{
- boost::asio::io_service service_;
-
- udp::endpoint endpoint_;
- udp::socket socket_;
-
- boost::thread thread_;
-
- Concurrency::call<core::monitor::message> on_next_;
+ boost::asio::io_service service_;
+
+ udp::endpoint endpoint_;
+ udp::socket socket_;
+
+ std::array<tbb::concurrent_unordered_map<std::string, byte_vector>, 2> updates_;
+ tbb::atomic<unsigned int> updates_counter_;
+
+ Concurrency::call<core::monitor::message> call_;
+
+ boost::mutex cond_mutex_;
+ boost::condition_variable cond_;
+ tbb::atomic<bool> is_running_;
+ boost::thread thread_;
public:
impl(udp::endpoint endpoint,
Concurrency::ISource<core::monitor::message>& source)
: endpoint_(endpoint)
, socket_(service_, endpoint_.protocol())
- , thread_(std::bind(&boost::asio::io_service::run, &service_))
- , on_next_([this](const core::monitor::message& msg){ on_next(msg); })
+ , call_(boost::bind(&impl::on_next, this, _1))
+ , thread_(boost::bind(&impl::run, this))
{
- source.link_target(&on_next_);
+ source.link_target(&call_);
}
~impl()
{
+ is_running_ = false;
+
thread_.join();
}
void on_next(const core::monitor::message& msg)
{
- auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
-
- socket_.async_send_to(boost::asio::buffer(*data_ptr),
- endpoint_,
- boost::bind(&impl::handle_send_to, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- }
+ write_osc_event(updates_[updates_counter_ % 1][msg.path()], msg);
+ cond_.notify_one();
+ }
- void handle_send_to(const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
+ void run()
{
+ std::unordered_map<std::string, byte_vector> state;
+
+ is_running_ = true;
+
+ while(is_running_)
+ {
+ std::vector<boost::asio::mutable_buffers_1> buffers;
+
+ boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
+
+ if(cond_.timed_wait(cond_lock, boost::posix_time::milliseconds(500)))
+ {
+ auto& updates = updates_[(++updates_counter_ - 1) % 1];
+
+ BOOST_FOREACH(auto& slot, updates)
+ {
+ if(slot.second.empty())
+ continue;
+
+ auto it = state.lower_bound(slot.first);
+ if(it == std::end(state) || state.key_comp()(slot.first, it->first))
+ it = state.insert(it, std::make_pair(slot.first, slot.second));
+ else
+ std::swap(it->second, slot.second);
+
+ slot.second.clear();
+
+ buffers.push_back(boost::asio::buffer(it->second));
+ }
+ }
+ else
+ {
+ BOOST_FOREACH(auto& slot, state)
+ {
+ buffers.push_back(boost::asio::buffer(slot.second));
+ }
+ }
+
+ if(!buffers.empty())
+ socket_.send_to(buffers, endpoint_);
+ }
}
};