-#include "..\stdafx.h"
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../stdafx.h"
#include "client.h"
-#include "oscpack/oscOutboundPacketStream.h"
+#include "oscpack/OscOutboundPacketStream.h"
+#include "oscpack/OscHostEndianness.h"
#include <common/utility/string.h>
+#include <common/exception/win32_exception.h>
+#include <common/memory/endian.h>
+
+#include <core/monitor/monitor.h>
+
+#include <functional>
+#include <vector>
+#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
+#include <boost/bind.hpp>
#include <boost/thread.hpp>
-#include <boost/timer.hpp>
-#include <tbb/concurrent_queue.h>
-#include <tbb/concurrent_unordered_map.h>
-#include <tbb/concurrent_unordered_set.h>
-
-#include <array>
-#include <functional>
-#include <memory>
-#include <unordered_map>
-#include <vector>
+#include <tbb/spin_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
using namespace boost::asio::ip;
namespace caspar { namespace protocol { namespace osc {
-
+
template<typename T>
struct no_init_proxy
{
void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
};
-struct size_visitor : public boost::static_visitor<std::size_t>
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
{
- std::size_t operator()(const std::string& value) { return value.size() * sizeof(value.front()) * 2; }
- std::size_t operator()(const std::wstring& value) { return value.size() * sizeof(value.front()) * 2; }
- std::size_t operator()(const std::vector<int8_t>& value) { return value.size() * sizeof(value.front()) * 2; }
+ destination.resize(4096);
- template<typename T>
- std::size_t operator()(const T&)
- {
- return 64;
- }
-};
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ o << ::osc::BeginMessage(e.path().c_str());
+
+ param_visitor<decltype(o)> param_visitor(o);
+ BOOST_FOREACH(const auto& data, e.data())
+ boost::apply_visitor(param_visitor, data);
+
+ o << ::osc::EndMessage;
+
+ destination.resize(o.Size());
+}
-void write_osc_event(byte_vector& destination, const core::monitor::message& e)
+byte_vector write_osc_bundle_start()
{
- try
- {
- std::size_t size = 0;
-
- size_visitor size_visitor;
- BOOST_FOREACH(auto& data, e.data())
- size += boost::apply_visitor(size_visitor, data);
+ byte_vector destination;
+ destination.resize(16);
- destination.resize(size);
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ o << ::osc::BeginBundle();
- ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ destination.resize(o.Size());
- o << ::osc::BeginMessage(e.path().c_str());
-
- param_visitor<decltype(o)> param_visitor(o);
- BOOST_FOREACH(auto& data, e.data())
- boost::apply_visitor(param_visitor, data);
-
- o << ::osc::EndMessage;
-
- destination.resize(o.Size());
- }
- catch(...)
- {
- }
+ return destination;
}
-struct client::impl
+void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
+{
+ destination.resize(4);
+
+ int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
+
+#ifdef OSC_HOST_LITTLE_ENDIAN
+ *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
+#else
+ *bundle_element_size = static_cast<int32_t>(bundle.size());
+#endif
+}
+
+struct client::impl : public std::enable_shared_from_this<client::impl>, core::monitor::sink
{
- boost::asio::io_service service_;
-
- udp::endpoint endpoint_;
- udp::socket socket_;
-
- std::array<tbb::concurrent_unordered_map<std::string, byte_vector>, 2> updates_;
- tbb::atomic<unsigned int> updates_counter_;
-
- Concurrency::call<core::monitor::message> call_;
-
- boost::mutex cond_mutex_;
- boost::condition_variable cond_;
- tbb::atomic<bool> is_running_;
- boost::thread thread_;
+ std::shared_ptr<boost::asio::io_service> service_;
+ udp::socket socket_;
+ tbb::spin_mutex endpoints_mutex_;
+ std::map<udp::endpoint, int> reference_counts_by_endpoint_;
+
+ std::unordered_map<std::string, byte_vector> updates_;
+ boost::mutex updates_mutex_;
+ boost::condition_variable updates_cond_;
+
+ tbb::atomic<bool> is_running_;
+
+ boost::thread thread_;
public:
- impl(udp::endpoint endpoint,
- Concurrency::ISource<core::monitor::message>& source)
- : endpoint_(endpoint)
- , socket_(service_, endpoint_.protocol())
- , call_(boost::bind(&impl::on_next, this, _1))
+ impl(std::shared_ptr<boost::asio::io_service> service)
+ : service_(std::move(service))
+ , socket_(*service_, udp::v4())
, thread_(boost::bind(&impl::run, this))
{
- source.link_target(&call_);
}
~impl()
- {
+ {
is_running_ = false;
+ updates_cond_.notify_one();
+
thread_.join();
}
-
- void on_next(const core::monitor::message& msg)
+
+ std::shared_ptr<void> get_subscription_token(
+ const boost::asio::ip::udp::endpoint& endpoint)
{
- write_osc_event(updates_[updates_counter_ % 1][msg.path()], msg);
- cond_.notify_one();
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ ++reference_counts_by_endpoint_[endpoint];
+
+ std::weak_ptr<impl> weak_self = shared_from_this();
+
+ return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
+ {
+ auto strong = weak_self.lock();
+
+ if (!strong)
+ return;
+
+ auto& self = *strong;
+
+ tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
+
+ int reference_count_after =
+ --self.reference_counts_by_endpoint_[endpoint];
+
+ if (reference_count_after == 0)
+ self.reference_counts_by_endpoint_.erase(endpoint);
+ });
}
+private:
+ void propagate(const core::monitor::message& msg)
+ {
+ boost::lock_guard<boost::mutex> lock(updates_mutex_);
- void run()
+ try
+ {
+ write_osc_event(updates_[msg.path()], msg);
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ updates_.erase(msg.path());
+ }
+
+ updates_cond_.notify_one();
+ }
+
+ template<typename T>
+ void do_send(
+ const T& buffers, const std::vector<udp::endpoint>& destinations)
{
- std::unordered_map<std::string, byte_vector> state;
+ boost::system::error_code ec;
- is_running_ = true;
+ BOOST_FOREACH(const auto& endpoint, destinations)
+ socket_.send_to(buffers, endpoint, 0, ec);
+ }
- while(is_running_)
+ void run()
+ {
+ // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
+ const int SAFE_DATAGRAM_SIZE = 508;
+
+ try
{
- std::vector<boost::asio::mutable_buffers_1> buffers;
+ is_running_ = true;
- boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
-
- cond_.wait(cond_lock);
-
- auto& updates = updates_[(++updates_counter_ - 1) % 1];
+ std::unordered_map<std::string, byte_vector> updates;
+ std::vector<udp::endpoint> destinations;
+ const byte_vector bundle_header = write_osc_bundle_start();
+ std::vector<byte_vector> element_headers;
+
+ while (is_running_)
+ {
+ updates.clear();
+ destinations.clear();
+
+ {
+ boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
- BOOST_FOREACH(auto& slot, updates)
- {
- if(slot.second.empty())
+ if (updates_.empty())
+ updates_cond_.wait(cond_lock);
+
+ std::swap(updates, updates_);
+ }
+
+ {
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+ destinations.push_back(endpoint.first);
+ }
+
+ if (destinations.empty())
continue;
- auto it = state.lower_bound(slot.first);
- if(it == std::end(state) || state.key_comp()(slot.first, it->first))
- it = state.insert(it, std::make_pair(slot.first, slot.second));
- else
- std::swap(it->second, slot.second);
+ std::vector<boost::asio::const_buffers_1> buffers;
+ element_headers.resize(
+ std::max(element_headers.size(), updates.size()));
- slot.second.clear();
+ int i = 0;
+ int datagram_size = bundle_header.size();
+ buffers.push_back(boost::asio::buffer(bundle_header));
- buffers.push_back(boost::asio::buffer(it->second));
- }
+ BOOST_FOREACH(const auto& slot, updates)
+ {
+ write_osc_bundle_element_start(element_headers[i], slot.second);
+ const auto& headers = element_headers;
- if(!buffers.empty())
- socket_.send_to(buffers, endpoint_);
+ auto size_of_element = headers[i].size() + slot.second.size();
+
+ if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
+ {
+ do_send(buffers, destinations);
+ buffers.clear();
+ buffers.push_back(boost::asio::buffer(bundle_header));
+ datagram_size = bundle_header.size();
+ }
+
+ buffers.push_back(boost::asio::buffer(headers[i]));
+ buffers.push_back(boost::asio::buffer(slot.second));
+
+ datagram_size += size_of_element;
+ ++i;
+ }
+
+ if (!buffers.empty())
+ do_send(buffers, destinations);
+ }
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
}
}
};
-client::client(udp::endpoint endpoint,
- Concurrency::ISource<core::monitor::message>& source)
- : impl_(new impl(endpoint, source))
+client::client(std::shared_ptr<boost::asio::io_service> service)
+ : impl_(new impl(std::move(service)))
{
}
{
}
-}}}
\ No newline at end of file
+std::shared_ptr<void> client::get_subscription_token(
+ const boost::asio::ip::udp::endpoint& endpoint)
+{
+ return impl_->get_subscription_token(endpoint);
+}
+
+safe_ptr<core::monitor::sink> client::sink()
+{
+ return impl_;
+}
+
+}}}