<ClInclude Include="filesystem\polling_filesystem_monitor.h" />\r
<ClInclude Include="gl\gl_check.h" />\r
<ClInclude Include="log\log.h" />\r
- <ClInclude Include="memory\byte_order.h" />\r
+ <ClInclude Include="memory\endian.h" />\r
<ClInclude Include="memory\memclr.h" />\r
<ClInclude Include="memory\memcpy.h" />\r
<ClInclude Include="memory\memshfl.h" />\r
<ClInclude Include="utility\iterator.h">\r
<Filter>source\utility</Filter>\r
</ClInclude>\r
- <ClInclude Include="memory\byte_order.h">\r
+ <ClInclude Include="memory\endian.h">\r
<Filter>source\memory</Filter>\r
</ClInclude>\r
</ItemGroup>\r
+++ /dev/null
-/*
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
-*
-* This file is part of CasparCG (www.casparcg.com).
-*
-* CasparCG is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published by
-* the Free Software Foundation, either version 3 of the License, or
-* (at your option) any later version.
-*
-* CasparCG is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
-*
-* Author: Helge Norberg, helge.norberg@svt.se
-*/
-
-#pragma once
-
-#include <intrin.h>
-
-namespace caspar {
-
-template<typename T>
-inline T swap_byte_order(T value)
-{
- T result;
-
- swap_byte_order<sizeof(T)>(
- reinterpret_cast<const char*>(&value),
- reinterpret_cast<char*>(&result));
-
- return result;
-}
-
-template<size_t num_bytes>
-inline void swap_byte_order(const char* src, char* dest)
-{
- for (int i = 0, j = num_bytes - 1; i != num_bytes; ++i, --j)
- dest[i] = src[j];
-}
-
-template<>
-inline void swap_byte_order<sizeof(unsigned short)>(const char* src, char* dest)
-{
- auto result = reinterpret_cast<unsigned short*>(dest);
- auto value = reinterpret_cast<const unsigned short*>(src);
- *result = _byteswap_ushort(*value);
-}
-
-template<>
-inline void swap_byte_order<sizeof(unsigned long)>(const char* src, char* dest)
-{
- auto result = reinterpret_cast<unsigned long*>(dest);
- auto value = reinterpret_cast<const unsigned long*>(src);
- *result = _byteswap_ulong(*value);
-}
-
-template<>
-inline void swap_byte_order<sizeof(unsigned __int64)>(const char* src, char* dest)
-{
- auto result = reinterpret_cast<unsigned __int64*>(dest);
- auto value = reinterpret_cast<const unsigned __int64*>(src);
- *result = _byteswap_uint64(*value);
-}
-
-}
--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include <type_traits>
+
+#include <intrin.h>
+
+namespace caspar {
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned char), T>::type swap_byte_order(
+ const T& value)
+{
+ return value;
+}
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned short), T>::type swap_byte_order(
+ const T& value)
+{
+ auto swapped = _byteswap_ushort(reinterpret_cast<const unsigned short&>(value));
+ return reinterpret_cast<const T&>(swapped);
+}
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned long), T>::type swap_byte_order(
+ const T& value)
+{
+ auto swapped = _byteswap_ulong(reinterpret_cast<const unsigned long&>(value));
+ return reinterpret_cast<const T&>(swapped);
+}
+
+template<typename T>
+typename std::enable_if<sizeof(T) == sizeof(unsigned long long), T>::type swap_byte_order(
+ const T& value)
+{
+ auto swapped = _byteswap_uint64(reinterpret_cast<const unsigned long long&>(value));
+ return reinterpret_cast<const T&>(swapped);
+}
+
+}
};
typedef Concurrency::ISource<monitor::message> source;
-typedef Concurrency::overwrite_buffer<monitor::message> multi_target;
-
}}}
\ No newline at end of file
#include "io_service_manager.h"
+#include <memory>
+
#include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp>
struct io_service_manager::impl
{
boost::asio::io_service service_;
+ // To keep the io_service::run() running although no pending async
+ // operations are posted.
+ std::unique_ptr<boost::asio::io_service::work> work_;
boost::thread thread_;
impl()
- : thread_(std::bind(&boost::asio::io_service::run, &service_))
+ : work_(new boost::asio::io_service::work(service_))
+ , thread_(std::bind(&boost::asio::io_service::run, &service_))
{
}
~impl()
{
+ work_.reset();
+ service_.stop();
thread_.join();
}
};
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
+#include <tbb/spin_mutex.h>
+
using namespace boost::asio::ip;
namespace caspar { namespace protocol { namespace osc {
return std::vector<char>(o.Data(), o.Data() + o.Size());
}
-struct client::impl
+struct client::impl : public std::enable_shared_from_this<client::impl>
{
- udp::endpoint endpoint_;
- udp::socket socket_;
+ tbb::spin_mutex endpoints_mutex_;
+ std::map<udp::endpoint, int> reference_counts_by_endpoint_;
+ udp::socket socket_;
Concurrency::call<core::monitor::message> on_next_;
public:
impl(
boost::asio::io_service& service,
- udp::endpoint endpoint,
Concurrency::ISource<core::monitor::message>& source)
- : endpoint_(endpoint)
- , socket_(service, endpoint_.protocol())
- , on_next_([this](const core::monitor::message& msg){ on_next(msg); })
+ : socket_(service, udp::v4())
+ , on_next_([this](const core::monitor::message& msg) { on_next(msg); })
{
source.link_target(&on_next_);
}
+
+ std::shared_ptr<void> get_prenumeration_token(
+ const boost::asio::ip::udp::endpoint& endpoint)
+ {
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ ++reference_counts_by_endpoint_[endpoint];
+
+ std::weak_ptr<impl> weak_self = shared_from_this();
+
+ return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
+ {
+ auto strong = weak_self.lock();
+
+ if (!strong)
+ return;
+
+ auto& self = *strong;
+
+ tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
+
+ int reference_count_after =
+ --self.reference_counts_by_endpoint_[endpoint];
+
+ if (reference_count_after == 0)
+ self.reference_counts_by_endpoint_.erase(endpoint);
+ });
+ }
void on_next(const core::monitor::message& msg)
{
auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
- socket_.async_send_to(boost::asio::buffer(*data_ptr),
- endpoint_,
- boost::bind(&impl::handle_send_to, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- }
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ BOOST_FOREACH(auto& elem, reference_counts_by_endpoint_)
+ {
+ auto& endpoint = elem.first;
+
+ // TODO: We seem to be lucky here, because according to asio
+ // documentation only one async operation can be "in flight"
+ // at any given point in time for a socket. This somehow seems
+ // to work though in the case of UDP and Windows.
+ socket_.async_send_to(
+ boost::asio::buffer(*data_ptr),
+ endpoint,
+ boost::bind(
+ &impl::handle_send_to,
+ this,
+ data_ptr, // The data_ptr needs to live
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+ }
- void handle_send_to(const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
+ void handle_send_to(
+ const safe_ptr<std::vector<char>>& /* sent_buffer */,
+ const boost::system::error_code& /*error*/,
+ size_t /*bytes_sent*/)
{
}
};
-client::client(boost::asio::io_service& service, udp::endpoint endpoint,
- Concurrency::ISource<core::monitor::message>& source)
- : impl_(new impl(service, endpoint, source))
+client::client(
+ boost::asio::io_service& service,
+ Concurrency::ISource<core::monitor::message>& source)
+ : impl_(new impl(service, source))
{
}
{
}
+std::shared_ptr<void> client::get_prenumeration_token(
+ const boost::asio::ip::udp::endpoint& endpoint)
+{
+ return impl_->get_prenumeration_token(endpoint);
+}
+
}}}
client(
boost::asio::io_service& service,
- boost::asio::ip::udp::endpoint endpoint,
Concurrency::ISource<core::monitor::message>& source);
client(client&&);
+ /**
+ * Get a prenumeration token that ensures that OSC messages are sent to the
+ * given endpoint as long as the token is alive. It will stop sending when
+ * the token is dropped unless another token to the same endpoint has
+ * previously been checked out.
+ *
+ * @param endpoint The UDP endpoint to send OSC messages to.
+ *
+ * @return The token. It is ok for the token to outlive the client
+ */
+ std::shared_ptr<void> get_prenumeration_token(
+ const boost::asio::ip::udp::endpoint& endpoint);
+
~client();
// Methods
private:
struct impl;
- std::unique_ptr<impl> impl_;
+ std::shared_ptr<impl> impl_;
};
}}}
#include <stdlib.h>
#include <assert.h>
-#include <common/memory/byte_order.h>
+#include <common/memory/endian.h>
#if defined(__WIN32__) || defined(WIN32)
#include <malloc.h> // for alloca
{\r
auto lifecycle_bound = lifecycle_factory(ipv4_address);\r
\r
- if (lifecycle_bound)\r
- pClientSocket->bind_to_lifecycle(lifecycle_bound);\r
+ pClientSocket->bind_to_lifecycle(lifecycle_bound);\r
}\r
}\r
\r
{\r
protocol::asio::io_service_manager io_service_manager_;\r
core::monitor::subject monitor_subject_;\r
- core::monitor::multi_target multi_target_;\r
boost::promise<bool>& shutdown_server_now_;\r
safe_ptr<ogl_device> ogl_;\r
std::vector<safe_ptr<IO::AsyncEventServer>> async_servers_; \r
std::shared_ptr<IO::AsyncEventServer> primary_amcp_server_;\r
- std::vector<osc::client> osc_clients_;\r
+ osc::client osc_client_;\r
+ std::vector<std::shared_ptr<void>> predefined_osc_prenumerations_;\r
std::vector<safe_ptr<video_channel>> channels_;\r
std::shared_ptr<thumbnail_generator> thumbnail_generator_;\r
\r
implementation(boost::promise<bool>& shutdown_server_now)\r
: shutdown_server_now_(shutdown_server_now)\r
, ogl_(ogl_device::create())\r
+ , osc_client_(io_service_manager_.service(), monitor_subject_)\r
{\r
- monitor_subject_.link_target(&multi_target_);\r
setup_audio(env::properties());\r
\r
ffmpeg::init();\r
ffmpeg::uninit();\r
\r
async_servers_.clear();\r
+ primary_amcp_server_.reset();\r
channels_.clear();\r
}\r
\r
predefined_client.second.get<std::wstring>(L"address");\r
const auto port =\r
predefined_client.second.get<unsigned short>(L"port");\r
- osc_clients_.push_back(osc::client(\r
- io_service_manager_.service(),\r
- udp::endpoint(\r
+ predefined_osc_prenumerations_.push_back(\r
+ osc_client_.get_prenumeration_token(udp::endpoint(\r
address_v4::from_string(narrow(address)),\r
- port),\r
- multi_target_));\r
+ port)));\r
}\r
}\r
\r
{\r
using namespace boost::asio::ip;\r
\r
- return std::make_shared<osc::client>(\r
- io_service_manager_.service(),\r
+ return osc_client_.get_prenumeration_token(\r
udp::endpoint(\r
address_v4::from_string(ipv4_address),\r
- default_port),\r
- multi_target_);\r
+ default_port));\r
});\r
}\r
\r