]> git.sesse.net Git - casparcg/commitdiff
osc: Bug fixes and major performance, memory overhead and latency improvements.
authorRobert Nagy <ronag@live.com>
Sat, 6 Jul 2013 20:57:16 +0000 (22:57 +0200)
committerRobert Nagy <ronag@live.com>
Sat, 6 Jul 2013 20:58:58 +0000 (22:58 +0200)
Sponsored by Boffins Technologies (www.boffins.se).

protocol/osc/client.cpp

index 621bf369fea82a87ae64ae6902c877e3036618e3..ef774052fe1916d377934b4d8ed7c2407a97c8fa 100644 (file)
@@ -6,17 +6,39 @@
 
 #include <common/utility/string.h>
 
-#include <functional>
-#include <vector>
-
 #include <boost/asio.hpp>
 #include <boost/foreach.hpp>
 #include <boost/thread.hpp>
+#include <boost/timer.hpp>
+
+#include <tbb/concurrent_queue.h>
+#include <tbb/concurrent_unordered_map.h>
+#include <tbb/concurrent_unordered_set.h>
+
+#include <array>
+#include <functional>
+#include <memory>
+#include <unordered_map>
+#include <vector>
 
 using namespace boost::asio::ip;
 
 namespace caspar { namespace protocol { namespace osc {
        
+template<typename T>
+struct no_init_proxy
+{
+    T value;
+
+    no_init_proxy() 
+       {
+               static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
+        static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
+    }
+};
+
+typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+
 template<typename T>
 struct param_visitor : public boost::static_visitor<void>
 {
@@ -39,62 +61,132 @@ struct param_visitor : public boost::static_visitor<void>
        void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
 };
 
-std::vector<char> write_osc_event(const core::monitor::message& e)
+struct size_visitor : public boost::static_visitor<std::size_t>
+{              
+       std::size_t operator()(const std::string& value)                        { return value.size() * sizeof(value.front()) * 2; }
+       std::size_t operator()(const std::wstring& value)                       { return value.size() * sizeof(value.front()) * 2; }
+       std::size_t operator()(const std::vector<int8_t>& value)        { return value.size() * sizeof(value.front()) * 2; }
+
+       template<typename T>
+       std::size_t operator()(const T&)
+       {
+               return 64;
+       }
+};
+
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
 {
-       std::array<char, 4096> buffer;
-       ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
+       try
+       {
+               std::size_t size = 0;
+               
+               size_visitor size_visitor;
+               BOOST_FOREACH(auto& data, e.data())
+                       size += boost::apply_visitor(size_visitor, data);
 
-       o       << ::osc::BeginMessage(e.path().c_str());
+               destination.resize(size);
+
+               ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+
+               o << ::osc::BeginMessage(e.path().c_str());
                                
-       param_visitor<decltype(o)> pd_visitor(o);
-       BOOST_FOREACH(auto data, e.data())
-               boost::apply_visitor(pd_visitor, data);
+               param_visitor<decltype(o)> param_visitor(o);
+               BOOST_FOREACH(auto& data, e.data())
+                       boost::apply_visitor(param_visitor, data);
                                
-       o       << ::osc::EndMessage;
+               o << ::osc::EndMessage;
                
-       return std::vector<char>(o.Data(), o.Data() + o.Size());
+               destination.resize(o.Size());
+       }
+       catch(...)
+       {
+       }
 }
 
 struct client::impl
 {
-       boost::asio::io_service                                         service_;
-
-       udp::endpoint                                                           endpoint_;
-       udp::socket                                                                     socket_;        
-
-       boost::thread                                                           thread_;
-
-       Concurrency::call<core::monitor::message>       on_next_;
+       boost::asio::io_service                                                                                                 service_;
+                                                                                                                                                       
+       udp::endpoint                                                                                                                   endpoint_;
+       udp::socket                                                                                                                             socket_;        
+       
+       std::array<tbb::concurrent_unordered_map<std::string, byte_vector>, 2>  updates_;
+       tbb::atomic<unsigned int>                                                                                               updates_counter_;                 
+
+       Concurrency::call<core::monitor::message>                                                               call_;
+                                                                               
+       boost::mutex                                                                                                                    cond_mutex_;
+       boost::condition_variable                                                                                               cond_;
+       tbb::atomic<bool>                                                                                                               is_running_;
+       boost::thread                                                                                                                   thread_;
        
 public:
        impl(udp::endpoint endpoint, 
                 Concurrency::ISource<core::monitor::message>& source)
                : endpoint_(endpoint)
                , socket_(service_, endpoint_.protocol())
-               , thread_(std::bind(&boost::asio::io_service::run, &service_))
-               , on_next_([this](const core::monitor::message& msg){ on_next(msg); })
+               , call_(boost::bind(&impl::on_next, this, _1))
+               , thread_(boost::bind(&impl::run, this))
        {
-               source.link_target(&on_next_);
+               source.link_target(&call_);             
        }
 
        ~impl()
        {               
+               is_running_ = false;
+
                thread_.join();
        }
        
        void on_next(const core::monitor::message& msg)
        {
-               auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
-
-               socket_.async_send_to(boost::asio::buffer(*data_ptr), 
-                                                         endpoint_,
-                                                         boost::bind(&impl::handle_send_to, this,
-                                                         boost::asio::placeholders::error,
-                                                         boost::asio::placeholders::bytes_transferred));               
-       }       
+               write_osc_event(updates_[updates_counter_ % 1][msg.path()], msg);
+               cond_.notify_one();
+       }
 
-       void handle_send_to(const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
+       void run()
        {
+               std::unordered_map<std::string, byte_vector> state;
+
+               is_running_ = true;
+
+               while(is_running_)
+               {
+                       std::vector<boost::asio::mutable_buffers_1>     buffers;
+
+                       boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
+                                       
+                       if(cond_.timed_wait(cond_lock, boost::posix_time::milliseconds(500)))
+                       {                                       
+                               auto& updates = updates_[(++updates_counter_ - 1) % 1];
+
+                               BOOST_FOREACH(auto& slot, updates)              
+                               {
+                                       if(slot.second.empty())
+                                               continue;
+
+                                       auto it = state.lower_bound(slot.first);
+                                       if(it == std::end(state) || state.key_comp()(slot.first, it->first))
+                                               it = state.insert(it, std::make_pair(slot.first, slot.second));
+                                       else
+                                               std::swap(it->second, slot.second);
+
+                                       slot.second.clear();
+
+                                       buffers.push_back(boost::asio::buffer(it->second));
+                               }
+                       }
+                       else
+                       {
+                               BOOST_FOREACH(auto& slot, state)                
+                               {
+                                       buffers.push_back(boost::asio::buffer(slot.second));
+                               }
+                       }               
+
+                       if(!buffers.empty())
+                               socket_.send_to(buffers, endpoint_);
+               }
        }
 };