]> git.sesse.net Git - casparcg/commitdiff
Modifications to OSC support after discussions with Robert Nagy
authorHelge Norberg <helge.norberg@gmail.com>
Tue, 28 May 2013 17:46:31 +0000 (19:46 +0200)
committerHelge Norberg <helge.norberg@gmail.com>
Tue, 28 May 2013 17:46:31 +0000 (19:46 +0200)
common/common.vcxproj
common/common.vcxproj.filters
common/memory/byte_order.h [deleted file]
common/memory/endian.h [new file with mode: 0644]
core/monitor/monitor.h
protocol/asio/io_service_manager.cpp
protocol/osc/client.cpp
protocol/osc/client.h
protocol/osc/oscpack/OscOutboundPacketStream.cpp
protocol/util/AsyncEventServer.cpp
shell/server.cpp

index f5f9ef376a0b2b7caab55142804990cd0f380a8d..82b726f6c192f3a3fe766a51ed5a90d9842a6c13 100644 (file)
     <ClInclude Include="filesystem\polling_filesystem_monitor.h" />\r
     <ClInclude Include="gl\gl_check.h" />\r
     <ClInclude Include="log\log.h" />\r
-    <ClInclude Include="memory\byte_order.h" />\r
+    <ClInclude Include="memory\endian.h" />\r
     <ClInclude Include="memory\memclr.h" />\r
     <ClInclude Include="memory\memcpy.h" />\r
     <ClInclude Include="memory\memshfl.h" />\r
index 8778c41c3ae6dcb1d8ff5aeaa3bb4211be6a508f..838ecf6dc32721536f42bffe6423ffbfd0ee0aeb 100644 (file)
     <ClInclude Include="utility\iterator.h">\r
       <Filter>source\utility</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="memory\byte_order.h">\r
+    <ClInclude Include="memory\endian.h">\r
       <Filter>source\memory</Filter>\r
     </ClInclude>\r
   </ItemGroup>\r
diff --git a/common/memory/byte_order.h b/common/memory/byte_order.h
deleted file mode 100644 (file)
index 622f075..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
-*
-* This file is part of CasparCG (www.casparcg.com).
-*
-* CasparCG is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published by
-* the Free Software Foundation, either version 3 of the License, or
-* (at your option) any later version.
-*
-* CasparCG is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-* GNU General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
-*
-* Author: Helge Norberg, helge.norberg@svt.se
-*/
-
-#pragma once
-
-#include <intrin.h>
-
-namespace caspar {
-
-template<typename T>
-inline T swap_byte_order(T value)
-{
-       T result;
-
-       swap_byte_order<sizeof(T)>(
-                       reinterpret_cast<const char*>(&value),
-                       reinterpret_cast<char*>(&result));
-
-       return result;
-}
-
-template<size_t num_bytes>
-inline void swap_byte_order(const char* src, char* dest)
-{
-       for (int i = 0, j = num_bytes - 1; i != num_bytes; ++i, --j)
-               dest[i] = src[j];
-}
-
-template<>
-inline void swap_byte_order<sizeof(unsigned short)>(const char* src, char* dest)
-{
-       auto result = reinterpret_cast<unsigned short*>(dest);
-       auto value = reinterpret_cast<const unsigned short*>(src);
-       *result = _byteswap_ushort(*value);
-}
-
-template<>
-inline void swap_byte_order<sizeof(unsigned long)>(const char* src, char* dest)
-{
-       auto result = reinterpret_cast<unsigned long*>(dest);
-       auto value = reinterpret_cast<const unsigned long*>(src);
-       *result = _byteswap_ulong(*value);
-}
-
-template<>
-inline void swap_byte_order<sizeof(unsigned __int64)>(const char* src, char* dest)
-{
-       auto result = reinterpret_cast<unsigned __int64*>(dest);
-       auto value = reinterpret_cast<const unsigned __int64*>(src);
-       *result = _byteswap_uint64(*value);
-}
-
-}
diff --git a/common/memory/endian.h b/common/memory/endian.h
new file mode 100644 (file)
index 0000000..6552a13
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include <type_traits>
+
+#include <intrin.h>
+
+namespace caspar {
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned char), T>::type swap_byte_order(
+               const T& value)
+{
+       return value;
+}
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned short), T>::type swap_byte_order(
+               const T& value)
+{
+       auto swapped = _byteswap_ushort(reinterpret_cast<const unsigned short&>(value));
+       return reinterpret_cast<const T&>(swapped);
+}
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned long), T>::type swap_byte_order(
+               const T& value)
+{
+       auto swapped = _byteswap_ulong(reinterpret_cast<const unsigned long&>(value));
+    return reinterpret_cast<const T&>(swapped);
+}
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned long long), T>::type swap_byte_order(
+               const T& value)
+{
+       auto swapped = _byteswap_uint64(reinterpret_cast<const unsigned long long&>(value));
+    return reinterpret_cast<const T&>(swapped);
+}
+
+}
index 615022a5c3339967444275014d7ebad7e36cf491..e19ae42fbc77f57d6dfc073222bc3342f9321db8 100644 (file)
@@ -89,7 +89,5 @@ public:
 };
 
 typedef Concurrency::ISource<monitor::message> source;
-typedef Concurrency::overwrite_buffer<monitor::message> multi_target;
-
 
 }}}
\ No newline at end of file
index 0709d073d84642022bc8e80519e125f7ef634dd9..4e02efa601289dcbe4bfb169e02ffea3e2e3165f 100644 (file)
@@ -23,6 +23,8 @@
 
 #include "io_service_manager.h"
 
+#include <memory>
+
 #include <boost/asio/io_service.hpp>
 #include <boost/thread/thread.hpp>
 
@@ -31,15 +33,21 @@ namespace caspar { namespace protocol { namespace asio {
 struct io_service_manager::impl
 {
        boost::asio::io_service service_;
+       // To keep the io_service::run() running although no pending async
+       // operations are posted.
+       std::unique_ptr<boost::asio::io_service::work> work_;
        boost::thread thread_;
 
        impl()
-               : thread_(std::bind(&boost::asio::io_service::run, &service_))
+               : work_(new boost::asio::io_service::work(service_))
+               , thread_(std::bind(&boost::asio::io_service::run, &service_))
        {
        }
 
        ~impl()
        {
+               work_.reset();
+               service_.stop();
                thread_.join();
        }
 };
index e4855be3ba28a45c97c7c8e344a7638ad1c200d4..f6c2001b1407c15a56f559c3edfa60121e58a907 100644 (file)
@@ -34,6 +34,8 @@
 #include <boost/foreach.hpp>
 #include <boost/bind.hpp>
 
+#include <tbb/spin_mutex.h>
+
 using namespace boost::asio::ip;
 
 namespace caspar { namespace protocol { namespace osc {
@@ -76,44 +78,90 @@ std::vector<char> write_osc_event(const core::monitor::message& e)
        return std::vector<char>(o.Data(), o.Data() + o.Size());
 }
 
-struct client::impl
+struct client::impl : public std::enable_shared_from_this<client::impl>
 {
-       udp::endpoint                                                           endpoint_;
-       udp::socket                                                                     socket_;        
+       tbb::spin_mutex                                                         endpoints_mutex_;
+       std::map<udp::endpoint, int>                            reference_counts_by_endpoint_;
+       udp::socket                                                                     socket_;
 
        Concurrency::call<core::monitor::message>       on_next_;
        
 public:
        impl(
                        boost::asio::io_service& service,
-                       udp::endpoint endpoint,
                        Concurrency::ISource<core::monitor::message>& source)
-               : endpoint_(endpoint)
-               , socket_(service, endpoint_.protocol())
-               , on_next_([this](const core::monitor::message& msg){ on_next(msg); })
+               : socket_(service, udp::v4())
+               , on_next_([this](const core::monitor::message& msg) { on_next(msg); })
        {
                source.link_target(&on_next_);
        }
+
+       std::shared_ptr<void> get_prenumeration_token(
+                       const boost::asio::ip::udp::endpoint& endpoint)
+       {
+               tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+               ++reference_counts_by_endpoint_[endpoint];
+
+               std::weak_ptr<impl> weak_self = shared_from_this();
+
+               return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
+               {
+                       auto strong = weak_self.lock();
+
+                       if (!strong)
+                               return;
+
+                       auto& self = *strong;
+
+                       tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
+
+                       int reference_count_after =
+                               --self.reference_counts_by_endpoint_[endpoint];
+
+                       if (reference_count_after == 0)
+                               self.reference_counts_by_endpoint_.erase(endpoint);
+               });
+       }
        
        void on_next(const core::monitor::message& msg)
        {
                auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
 
-               socket_.async_send_to(boost::asio::buffer(*data_ptr), 
-                                                         endpoint_,
-                                                         boost::bind(&impl::handle_send_to, this,
-                                                         boost::asio::placeholders::error,
-                                                         boost::asio::placeholders::bytes_transferred));               
-       }       
+               tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+               BOOST_FOREACH(auto& elem, reference_counts_by_endpoint_)
+               {
+                       auto& endpoint = elem.first;
+
+                       // TODO: We seem to be lucky here, because according to asio
+                       //       documentation only one async operation can be "in flight"
+                       //       at any given point in time for a socket. This somehow seems
+                       //       to work though in the case of UDP and Windows.
+                       socket_.async_send_to(
+                                       boost::asio::buffer(*data_ptr),
+                                       endpoint,
+                                       boost::bind(
+                                                       &impl::handle_send_to,
+                                                       this,
+                                                       data_ptr, // The data_ptr needs to live
+                                                       boost::asio::placeholders::error,
+                                                       boost::asio::placeholders::bytes_transferred));         
+               }
+       }
 
-       void handle_send_to(const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
+       void handle_send_to(
+                       const safe_ptr<std::vector<char>>& /* sent_buffer */,
+                       const boost::system::error_code& /*error*/,
+                       size_t /*bytes_sent*/)
        {
        }
 };
 
-client::client(boost::asio::io_service& service, udp::endpoint endpoint, 
-                          Concurrency::ISource<core::monitor::message>& source) 
-       : impl_(new impl(service, endpoint, source))
+client::client(
+               boost::asio::io_service& service,
+               Concurrency::ISource<core::monitor::message>& source) 
+       : impl_(new impl(service, source))
 {
 }
 
@@ -132,4 +180,10 @@ client::~client()
 {
 }
 
+std::shared_ptr<void> client::get_prenumeration_token(
+                       const boost::asio::ip::udp::endpoint& endpoint)
+{
+       return impl_->get_prenumeration_token(endpoint);
+}
+
 }}}
index a8aaf6be63d3cbe6a4596f4432622a8071edf2b1..795d4f85a7c1074fa362a7c73250eb28d5205a04 100644 (file)
@@ -41,11 +41,23 @@ public:
 
        client(
                        boost::asio::io_service& service,
-                       boost::asio::ip::udp::endpoint endpoint,
                        Concurrency::ISource<core::monitor::message>& source);
        
        client(client&&);
 
+       /**
+        * Get a prenumeration token that ensures that OSC messages are sent to the
+        * given endpoint as long as the token is alive. It will stop sending when
+        * the token is dropped unless another token to the same endpoint has
+        * previously been checked out.
+        *
+        * @param endpoint The UDP endpoint to send OSC messages to.
+        *
+        * @return The token. It is ok for the token to outlive the client
+        */
+       std::shared_ptr<void> get_prenumeration_token(
+                       const boost::asio::ip::udp::endpoint& endpoint);
+
        ~client();
 
        // Methods
@@ -56,7 +68,7 @@ public:
 
 private:
        struct impl;
-       std::unique_ptr<impl> impl_;
+       std::shared_ptr<impl> impl_;
 };
 
 }}}
index 611cf43fd67760c6a6acaa1443e02cae191bc4be..43ae2d0b7aa0c768aab81a5f313013f88292d1ee 100644 (file)
@@ -36,7 +36,7 @@
 #include <stdlib.h>
 #include <assert.h>
 
-#include <common/memory/byte_order.h>
+#include <common/memory/endian.h>
 
 #if defined(__WIN32__) || defined(WIN32)
 #include <malloc.h> // for alloca
index 4324a06ad56fbf0e45ceec875bdd79f09dd9fb00..851997bc15c51017ed2e6b0237b16a0997d98829 100644 (file)
@@ -300,8 +300,7 @@ bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) {
                {\r
                        auto lifecycle_bound = lifecycle_factory(ipv4_address);\r
 \r
-                       if (lifecycle_bound)\r
-                               pClientSocket->bind_to_lifecycle(lifecycle_bound);\r
+                       pClientSocket->bind_to_lifecycle(lifecycle_bound);\r
                }\r
        }\r
 \r
index 3629aae8dff2bdc50f0d8c0dc027f3f19d4019e2..967076513f38b4d96f7a3f02bb0ba37acd21102d 100644 (file)
@@ -75,20 +75,20 @@ struct server::implementation : boost::noncopyable
 {\r
        protocol::asio::io_service_manager                      io_service_manager_;\r
        core::monitor::subject                                          monitor_subject_;\r
-       core::monitor::multi_target                                     multi_target_;\r
        boost::promise<bool>&                                           shutdown_server_now_;\r
        safe_ptr<ogl_device>                                            ogl_;\r
        std::vector<safe_ptr<IO::AsyncEventServer>> async_servers_;     \r
        std::shared_ptr<IO::AsyncEventServer>           primary_amcp_server_;\r
-       std::vector<osc::client>                                        osc_clients_;\r
+       osc::client                                                                     osc_client_;\r
+       std::vector<std::shared_ptr<void>>                      predefined_osc_prenumerations_;\r
        std::vector<safe_ptr<video_channel>>            channels_;\r
        std::shared_ptr<thumbnail_generator>            thumbnail_generator_;\r
 \r
        implementation(boost::promise<bool>& shutdown_server_now)\r
                : shutdown_server_now_(shutdown_server_now)\r
                , ogl_(ogl_device::create())\r
+               , osc_client_(io_service_manager_.service(), monitor_subject_)\r
        {\r
-               monitor_subject_.link_target(&multi_target_);\r
                setup_audio(env::properties());\r
 \r
                ffmpeg::init();\r
@@ -129,6 +129,7 @@ struct server::implementation : boost::noncopyable
                ffmpeg::uninit();\r
 \r
                async_servers_.clear();\r
+               primary_amcp_server_.reset();\r
                channels_.clear();\r
        }\r
 \r
@@ -268,12 +269,10 @@ struct server::implementation : boost::noncopyable
                                                predefined_client.second.get<std::wstring>(L"address");\r
                                const auto port =\r
                                                predefined_client.second.get<unsigned short>(L"port");\r
-                               osc_clients_.push_back(osc::client(\r
-                                               io_service_manager_.service(),\r
-                                               udp::endpoint(\r
+                               predefined_osc_prenumerations_.push_back(\r
+                                               osc_client_.get_prenumeration_token(udp::endpoint(\r
                                                                address_v4::from_string(narrow(address)),\r
-                                                               port),\r
-                                               multi_target_));\r
+                                                               port)));\r
                        }\r
                }\r
 \r
@@ -284,12 +283,10 @@ struct server::implementation : boost::noncopyable
                                        {\r
                                                using namespace boost::asio::ip;\r
 \r
-                                               return std::make_shared<osc::client>(\r
-                                                               io_service_manager_.service(),\r
+                                               return osc_client_.get_prenumeration_token(\r
                                                                udp::endpoint(\r
                                                                                address_v4::from_string(ipv4_address),\r
-                                                                               default_port),\r
-                                                               multi_target_);\r
+                                                                               default_port));\r
                                        });\r
        }\r
 \r