]> git.sesse.net Git - casparcg/blobdiff - protocol/osc/client.cpp
#269
[casparcg] / protocol / osc / client.cpp
index b9ac59dca82468b60704734e5e24493ddd60d99d..16f1e9da23a502b3ba413a19ba35e7d9cfc00ea6 100644 (file)
@@ -1,30 +1,54 @@
-#include "..\stdafx.h"
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../stdafx.h"
 
 #include "client.h"
 
-#include "oscpack/oscOutboundPacketStream.h"
+#include "oscpack/OscOutboundPacketStream.h"
+#include "oscpack/OscHostEndianness.h"
 
 #include <common/utility/string.h>
+#include <common/exception/win32_exception.h>
+#include <common/memory/endian.h>
+
+#include <core/monitor/monitor.h>
+
+#include <functional>
+#include <vector>
+#include <unordered_map>
 
 #include <boost/asio.hpp>
 #include <boost/foreach.hpp>
+#include <boost/bind.hpp>
 #include <boost/thread.hpp>
-#include <boost/timer.hpp>
 
-#include <tbb/concurrent_queue.h>
-#include <tbb/concurrent_unordered_map.h>
-#include <tbb/concurrent_unordered_set.h>
-
-#include <array>
-#include <functional>
-#include <memory>
-#include <unordered_map>
-#include <vector>
+#include <tbb/spin_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
 
 using namespace boost::asio::ip;
 
 namespace caspar { namespace protocol { namespace osc {
-       
+
 template<typename T>
 struct no_init_proxy
 {
@@ -61,130 +85,216 @@ struct param_visitor : public boost::static_visitor<void>
        void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
 };
 
-struct size_visitor : public boost::static_visitor<std::size_t>
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
 {              
-       std::size_t operator()(const std::string& value)                        { return value.size() * sizeof(value.front()) * 2; }
-       std::size_t operator()(const std::wstring& value)                       { return value.size() * sizeof(value.front()) * 2; }
-       std::size_t operator()(const std::vector<int8_t>& value)        { return value.size() * sizeof(value.front()) * 2; }
+       destination.resize(4096);
 
-       template<typename T>
-       std::size_t operator()(const T&)
-       {
-               return 64;
-       }
-};
+       ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       o << ::osc::BeginMessage(e.path().c_str());
+                               
+       param_visitor<decltype(o)> param_visitor(o);
+       BOOST_FOREACH(const auto& data, e.data())
+               boost::apply_visitor(param_visitor, data);
+                               
+       o << ::osc::EndMessage;
+               
+       destination.resize(o.Size());
+}
 
-void write_osc_event(byte_vector& destination, const core::monitor::message& e)
+byte_vector write_osc_bundle_start()
 {
-       try
-       {
-               std::size_t size = 0;
-               
-               size_visitor size_visitor;
-               BOOST_FOREACH(auto& data, e.data())
-                       size += boost::apply_visitor(size_visitor, data);
+       byte_vector destination;
+       destination.resize(16);
 
-               destination.resize(size);
+       ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       o << ::osc::BeginBundle();
 
-               ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+       destination.resize(o.Size());
 
-               o << ::osc::BeginMessage(e.path().c_str());
-                               
-               param_visitor<decltype(o)> param_visitor(o);
-               BOOST_FOREACH(auto& data, e.data())
-                       boost::apply_visitor(param_visitor, data);
-                               
-               o << ::osc::EndMessage;
-               
-               destination.resize(o.Size());
-       }
-       catch(...)
-       {
-       }
+       return destination;
 }
 
-struct client::impl
+void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
+{              
+       destination.resize(4);
+
+       int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
+
+#ifdef OSC_HOST_LITTLE_ENDIAN
+       *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
+#else
+       *bundle_element_size = static_cast<int32_t>(bundle.size());
+#endif
+}
+
+struct client::impl : public std::enable_shared_from_this<client::impl>, core::monitor::sink
 {
-       boost::asio::io_service                                                                                                 service_;
-                                                                                                                                                       
-       udp::endpoint                                                                                                                   endpoint_;
-       udp::socket                                                                                                                             socket_;        
-       
-       std::array<tbb::concurrent_unordered_map<std::string, byte_vector>, 2>  updates_;
-       tbb::atomic<unsigned int>                                                                                               updates_counter_;                 
-
-       Concurrency::call<core::monitor::message>                                                               call_;
-                                                                               
-       boost::mutex                                                                                                                    cond_mutex_;
-       boost::condition_variable                                                                                               cond_;
-       tbb::atomic<bool>                                                                                                               is_running_;
-       boost::thread                                                                                                                   thread_;
+       std::shared_ptr<boost::asio::io_service>                service_;
+       udp::socket socket_;
+       tbb::spin_mutex                                                                 endpoints_mutex_;
+       std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
+
+       std::unordered_map<std::string, byte_vector>    updates_;
+       boost::mutex                                                                    updates_mutex_;                                                         
+       boost::condition_variable                                               updates_cond_;
+
+       tbb::atomic<bool>                                                               is_running_;
+
+       boost::thread                                                                   thread_;
        
 public:
-       impl(udp::endpoint endpoint, 
-                Concurrency::ISource<core::monitor::message>& source)
-               : endpoint_(endpoint)
-               , socket_(service_, endpoint_.protocol())
-               , call_(boost::bind(&impl::on_next, this, _1))
+       impl(std::shared_ptr<boost::asio::io_service> service)
+               : service_(std::move(service))
+               , socket_(*service_, udp::v4())
                , thread_(boost::bind(&impl::run, this))
        {
-               source.link_target(&call_);             
        }
 
        ~impl()
-       {               
+       {
                is_running_ = false;
 
+               updates_cond_.notify_one();
+
                thread_.join();
        }
-       
-       void on_next(const core::monitor::message& msg)
+
+       std::shared_ptr<void> get_subscription_token(
+                       const boost::asio::ip::udp::endpoint& endpoint)
        {
-               write_osc_event(updates_[updates_counter_ % 1][msg.path()], msg);
-               cond_.notify_one();
+               tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+               ++reference_counts_by_endpoint_[endpoint];
+
+               std::weak_ptr<impl> weak_self = shared_from_this();
+
+               return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
+               {
+                       auto strong = weak_self.lock();
+
+                       if (!strong)
+                               return;
+
+                       auto& self = *strong;
+
+                       tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
+
+                       int reference_count_after =
+                               --self.reference_counts_by_endpoint_[endpoint];
+
+                       if (reference_count_after == 0)
+                               self.reference_counts_by_endpoint_.erase(endpoint);
+               });
        }
+private:
+       void propagate(const core::monitor::message& msg)
+       {
+               boost::lock_guard<boost::mutex> lock(updates_mutex_);
 
-       void run()
+               try 
+               {
+                       write_osc_event(updates_[msg.path()], msg);
+               }
+               catch(...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+                       updates_.erase(msg.path());
+               }
+
+               updates_cond_.notify_one();
+       }
+
+       template<typename T>
+       void do_send(
+                       const T& buffers, const std::vector<udp::endpoint>& destinations)
        {
-               std::unordered_map<std::string, byte_vector> state;
+               boost::system::error_code ec;
 
-               is_running_ = true;
+               BOOST_FOREACH(const auto& endpoint, destinations)
+                       socket_.send_to(buffers, endpoint, 0, ec);
+       }
 
-               while(is_running_)
+       void run()
+       {
+               // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
+               const int SAFE_DATAGRAM_SIZE = 508;
+
+               try
                {
-                       std::vector<boost::asio::mutable_buffers_1>     buffers;
+                       is_running_ = true;
 
-                       boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
-                                       
-                       cond_.wait(cond_lock);
-                               
-                       auto& updates = updates_[(++updates_counter_ - 1) % 1];
+                       std::unordered_map<std::string, byte_vector> updates;
+                       std::vector<udp::endpoint> destinations;
+                       const byte_vector bundle_header = write_osc_bundle_start();
+                       std::vector<byte_vector> element_headers;
+
+                       while (is_running_)
+                       {               
+                               updates.clear();
+                               destinations.clear();
+
+                               {                       
+                                       boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
 
-                       BOOST_FOREACH(auto& slot, updates)              
-                       {
-                               if(slot.second.empty())
+                                       if (updates_.empty())
+                                               updates_cond_.wait(cond_lock);
+
+                                       std::swap(updates, updates_);
+                               }
+
+                               {
+                                       tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+                                       BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+                                               destinations.push_back(endpoint.first);
+                               }
+
+                               if (destinations.empty())
                                        continue;
 
-                               auto it = state.lower_bound(slot.first);
-                               if(it == std::end(state) || state.key_comp()(slot.first, it->first))
-                                       it = state.insert(it, std::make_pair(slot.first, slot.second));
-                               else
-                                       std::swap(it->second, slot.second);
+                               std::vector<boost::asio::const_buffers_1> buffers;
+                               element_headers.resize(
+                                               std::max(element_headers.size(), updates.size()));
 
-                               slot.second.clear();
+                               int i = 0;
+                               int datagram_size = bundle_header.size();
+                               buffers.push_back(boost::asio::buffer(bundle_header));
 
-                               buffers.push_back(boost::asio::buffer(it->second));
-                       }
+                               BOOST_FOREACH(const auto& slot, updates)
+                               {
+                                       write_osc_bundle_element_start(element_headers[i], slot.second);
+                                       const auto& headers = element_headers;
 
-                       if(!buffers.empty())
-                               socket_.send_to(buffers, endpoint_);
+                                       auto size_of_element = headers[i].size() + slot.second.size();
+       
+                                       if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
+                                       {
+                                               do_send(buffers, destinations);
+                                               buffers.clear();
+                                               buffers.push_back(boost::asio::buffer(bundle_header));
+                                               datagram_size = bundle_header.size();
+                                       }
+
+                                       buffers.push_back(boost::asio::buffer(headers[i]));
+                                       buffers.push_back(boost::asio::buffer(slot.second));
+
+                                       datagram_size += size_of_element;
+                                       ++i;
+                               }
+                       
+                               if (!buffers.empty())
+                                       do_send(buffers, destinations);
+                       }
+               }
+               catch (...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
                }
        }
 };
 
-client::client(udp::endpoint endpoint, 
-                          Concurrency::ISource<core::monitor::message>& source) 
-       : impl_(new impl(endpoint, source))
+client::client(std::shared_ptr<boost::asio::io_service> service) 
+       : impl_(new impl(std::move(service)))
 {
 }
 
@@ -203,4 +313,15 @@ client::~client()
 {
 }
 
-}}}
\ No newline at end of file
+std::shared_ptr<void> client::get_subscription_token(
+                       const boost::asio::ip::udp::endpoint& endpoint)
+{
+       return impl_->get_subscription_token(endpoint);
+}
+
+safe_ptr<core::monitor::sink> client::sink()
+{
+       return impl_;
+}
+
+}}}