};
typedef Concurrency::ISource<monitor::message> source;
+typedef Concurrency::overwrite_buffer<monitor::message> multi_target;
}}}
\ No newline at end of file
--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../StdAfx.h"
+
+#include "io_service_manager.h"
+
+#include <boost/asio/io_service.hpp>
+#include <boost/thread/thread.hpp>
+
+namespace caspar { namespace protocol { namespace asio {
+
+struct io_service_manager::impl
+{
+ boost::asio::io_service service_;
+ boost::thread thread_;
+
+ impl()
+ : thread_(std::bind(&boost::asio::io_service::run, &service_))
+ {
+ }
+
+ ~impl()
+ {
+ thread_.join();
+ }
+};
+
+io_service_manager::io_service_manager()
+ : impl_(new impl)
+{
+}
+
+io_service_manager::~io_service_manager()
+{
+}
+
+boost::asio::io_service& io_service_manager::service()
+{
+ return impl_->service_;
+}
+
+}}}
--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <memory>
+
+#include <boost/noncopyable.hpp>
+
+namespace boost { namespace asio {
+ class io_service;
+}}
+
+namespace caspar { namespace protocol { namespace asio {
+
+class io_service_manager : boost::noncopyable
+{
+public:
+ io_service_manager();
+ ~io_service_manager();
+ boost::asio::io_service& service();
+private:
+ struct impl;
+ std::unique_ptr<impl> impl_;
+};
+
+}}}
-#include "..\stdafx.h"
-
-#include "server.h"
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#include "../stdafx.h"
+
+#include "client.h"
#include "oscpack/oscOutboundPacketStream.h"
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
-#include <boost/thread.hpp>
+#include <boost/bind.hpp>
using namespace boost::asio::ip;
namespace caspar { namespace protocol { namespace osc {
-
+
template<typename T>
struct param_visitor : public boost::static_visitor<void>
{
return std::vector<char>(o.Data(), o.Data() + o.Size());
}
-struct server::impl
+struct client::impl
{
- boost::asio::io_service service_;
-
udp::endpoint endpoint_;
udp::socket socket_;
- boost::thread thread_;
-
Concurrency::call<core::monitor::message> on_next_;
public:
- impl(udp::endpoint endpoint,
- Concurrency::ISource<core::monitor::message>& source)
+ impl(
+ boost::asio::io_service& service,
+ udp::endpoint endpoint,
+ Concurrency::ISource<core::monitor::message>& source)
: endpoint_(endpoint)
- , socket_(service_, endpoint_.protocol())
- , thread_(std::bind(&boost::asio::io_service::run, &service_))
+ , socket_(service, endpoint_.protocol())
, on_next_([this](const core::monitor::message& msg){ on_next(msg); })
{
source.link_target(&on_next_);
}
-
- ~impl()
- {
- thread_.join();
- }
void on_next(const core::monitor::message& msg)
{
}
};
-server::server(udp::endpoint endpoint,
+client::client(boost::asio::io_service& service, udp::endpoint endpoint,
Concurrency::ISource<core::monitor::message>& source)
- : impl_(new impl(endpoint, source))
+ : impl_(new impl(service, endpoint, source))
{
}
-server::server(server&& other)
+client::client(client&& other)
: impl_(std::move(other.impl_))
{
}
-server& server::operator=(server&& other)
+client& client::operator=(client&& other)
{
impl_ = std::move(other.impl_);
return *this;
}
-server::~server()
+client::~client()
{
}
-}}}
\ No newline at end of file
+}}}
--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include <common/memory/safe_ptr.h>
+
+#include <core/monitor/monitor.h>
+#include <boost/asio/ip/udp.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace caspar { namespace protocol { namespace osc {
+
+class client
+{
+ client(const client&);
+ client& operator=(const client&);
+public:
+
+ // Static Members
+
+ // Constructors
+
+ client(
+ boost::asio::io_service& service,
+ boost::asio::ip::udp::endpoint endpoint,
+ Concurrency::ISource<core::monitor::message>& source);
+
+ client(client&&);
+
+ ~client();
+
+ // Methods
+
+ client& operator=(client&&);
+
+ // Properties
+
+private:
+ struct impl;
+ std::unique_ptr<impl> impl_;
+};
+
+}}}
+++ /dev/null
-#pragma once
-
-#include <common/memory/safe_ptr.h>
-
-#include <core/monitor/monitor.h>
-#include <boost/asio/ip/udp.hpp>
-
-namespace caspar { namespace protocol { namespace osc {
-
-class server
-{
- server(const server&);
- server& operator=(const server&);
-public:
-
- // Static Members
-
- // Constructors
-
- server(boost::asio::ip::udp::endpoint endpoint,
- Concurrency::ISource<core::monitor::message>& source);
-
- server(server&&);
-
- ~server();
-
- // Methods
-
- server& operator=(server&&);
-
- // Properties
-
-private:
- struct impl;
- std::unique_ptr<impl> impl_;
-};
-
-}}}
\ No newline at end of file
<ClInclude Include="amcp\AMCPCommandQueue.h" />\r
<ClInclude Include="amcp\AMCPCommandsImpl.h" />\r
<ClInclude Include="amcp\AMCPProtocolStrategy.h" />\r
+ <ClInclude Include="asio\io_service_manager.h" />\r
<ClInclude Include="cii\CIICommand.h" />\r
<ClInclude Include="cii\CIICommandsImpl.h" />\r
<ClInclude Include="cii\CIIProtocolStrategy.h" />\r
<ClInclude Include="osc\oscpack\OscPrintReceivedElements.h" />\r
<ClInclude Include="osc\oscpack\OscReceivedElements.h" />\r
<ClInclude Include="osc\oscpack\OscTypes.h" />\r
- <ClInclude Include="osc\server.h" />\r
+ <ClInclude Include="osc\client.h" />\r
<ClInclude Include="StdAfx.h" />\r
<ClInclude Include="util\AsyncEventServer.h" />\r
<ClInclude Include="util\ClientInfo.h" />\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
</ClCompile>\r
+ <ClCompile Include="asio\io_service_manager.cpp">\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ </ClCompile>\r
<ClCompile Include="cii\CIICommandsImpl.cpp">\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">NotUsing</PrecompiledHeader>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">NotUsing</PrecompiledHeader>\r
</ClCompile>\r
- <ClCompile Include="osc\server.cpp">\r
+ <ClCompile Include="osc\client.cpp">\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<Filter Include="source\osc\oscpack">\r
<UniqueIdentifier>{6d9a82d4-6805-4de0-b400-6212fac06109}</UniqueIdentifier>\r
</Filter>\r
+ <Filter Include="source\asio">\r
+ <UniqueIdentifier>{9c3a5197-d725-475d-ad5c-6b120e08022f}</UniqueIdentifier>\r
+ </Filter>\r
</ItemGroup>\r
<ItemGroup>\r
<ClInclude Include="amcp\AMCPCommand.h">\r
<ClInclude Include="osc\oscpack\OscOutboundPacketStream.h">\r
<Filter>source\osc\oscpack</Filter>\r
</ClInclude>\r
- <ClInclude Include="osc\server.h">\r
+ <ClInclude Include="asio\io_service_manager.h">\r
+ <Filter>source\asio</Filter>\r
+ </ClInclude>\r
+ <ClInclude Include="osc\client.h">\r
<Filter>source\osc</Filter>\r
</ClInclude>\r
</ItemGroup>\r
<ClCompile Include="osc\oscpack\OscOutboundPacketStream.cpp">\r
<Filter>source\osc\oscpack</Filter>\r
</ClCompile>\r
- <ClCompile Include="osc\server.cpp">\r
+ <ClCompile Include="asio\io_service_manager.cpp">\r
+ <Filter>source\asio</Filter>\r
+ </ClCompile>\r
+ <ClCompile Include="osc\client.cpp">\r
<Filter>source\osc</Filter>\r
</ClCompile>\r
</ItemGroup>\r
socketInfoCollection_.onSocketInfoRemoved = handler;\r
}\r
\r
+void AsyncEventServer::add_lifecycle_factory(const lifecycle_factory_t& factory)\r
+{\r
+ tbb::mutex::scoped_lock lock(mutex_);\r
+\r
+ lifecycle_factories_.push_back(factory);\r
+}\r
+\r
//////////////////////////////\r
// AsyncEventServer::Start\r
// RETURNS: true at successful startup\r
}\r
\r
TCHAR addressBuffer[32];\r
- MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32);\r
+ auto ipv4_address = inet_ntoa(clientAddr.sin_addr);\r
+ MultiByteToWideChar(CP_ACP, 0, ipv4_address, -1, addressBuffer, 32);\r
pClientSocket->host_ = addressBuffer;\r
\r
+ {\r
+ tbb::mutex::scoped_lock lock(mutex_);\r
+\r
+ BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)\r
+ {\r
+ auto lifecycle_bound = lifecycle_factory(ipv4_address);\r
+\r
+ if (lifecycle_bound)\r
+ pClientSocket->bind_to_lifecycle(lifecycle_bound);\r
+ }\r
+ }\r
+\r
socketInfoCollection_.AddSocketInfo(pClientSocket);\r
\r
CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str() << " " << socketInfoCollection_.Size();\r
typedef std::shared_ptr<SocketInfo> SocketInfoPtr;\r
\r
typedef std::function<void(caspar::IO::SocketInfoPtr)> ClientDisconnectEvent;\r
+typedef std::function<std::shared_ptr<void> (const std::string& ipv4_address)>\r
+ lifecycle_factory_t;\r
\r
class AsyncEventServer : public IRunnable\r
{\r
\r
void SetClientDisconnectHandler(ClientDisconnectEvent handler);\r
\r
+ void add_lifecycle_factory(const lifecycle_factory_t& lifecycle_factory);\r
private:\r
Thread listenThread_;\r
void Run(HANDLE stopEvent);\r
SocketInfoMap socketInfoMap_;\r
bool bDirty_;\r
};\r
+ std::vector<lifecycle_factory_t> lifecycle_factories_;\r
SocketInfoCollection socketInfoCollection_;\r
tbb::mutex mutex_;\r
};\r
pServer_->DisconnectClient(*this);\r
}\r
\r
+void SocketInfo::bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)\r
+{\r
+ lifecycle_bound_items_.push_back(lifecycle_bound);\r
+}\r
+\r
} //namespace IO\r
} //namespace caspar
\ No newline at end of file
void Send(const std::wstring& data);\r
void Disconnect();\r
virtual std::wstring print() const override {return host_;}\r
+ void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound);\r
\r
SOCKET socket_;\r
HANDLE event_;\r
friend class AsyncEventServer;\r
std::queue<std::wstring> sendQueue_;\r
AsyncEventServer* pServer_;\r
+ std::vector<std::shared_ptr<void>> lifecycle_bound_items_;\r
\r
std::vector<char> currentlySending_;\r
unsigned int currentlySendingOffset_;\r
<port>5250</port>\r
<protocol>AMCP</protocol>\r
</tcp>\r
- <udp>\r
- <address>127.0.0.1</address>\r
- <port>6250</port>\r
- <protocol>OSC</protocol>\r
- </udp>\r
</controllers>\r
+ <osc>\r
+ <default-port>6250</default-port>\r
+ <predefined-clients>\r
+ </predefined-clients>\r
+ </osc>\r
<audio>\r
<channel-layouts>\r
</channel-layouts>\r
</consumers>\r
</channel>\r
</channels>\r
+<osc>\r
+ <default-port>6250</default-port>\r
+ <predefined-clients>\r
+ <predefined-client>\r
+ <address>127.0.0.1</address>\r
+ <port>5253</port>\r
+ </predefined-client>\r
+ </predefined-clients>\r
+</osc>\r
<audio>\r
<channel-layouts>\r
<mono>\r
#include <atlbase.h>\r
\r
#include <protocol/amcp/AMCPProtocolStrategy.h>\r
-#include <protocol/osc/server.h>\r
\r
#include <modules/bluefish/bluefish.h>\r
#include <modules/decklink/decklink.h>\r
#include <protocol/CLK/CLKProtocolStrategy.h>\r
#include <protocol/util/AsyncEventServer.h>\r
#include <protocol/util/stateful_protocol_strategy_wrapper.h>\r
-#include <protocol/osc/server.h>\r
+#include <protocol/osc/client.h>\r
+#include <protocol/asio/io_service_manager.h>\r
\r
#include <boost/algorithm/string.hpp>\r
#include <boost/lexical_cast.hpp>\r
\r
struct server::implementation : boost::noncopyable\r
{\r
+ protocol::asio::io_service_manager io_service_manager_;\r
core::monitor::subject monitor_subject_;\r
+ core::monitor::multi_target multi_target_;\r
boost::promise<bool>& shutdown_server_now_;\r
safe_ptr<ogl_device> ogl_;\r
std::vector<safe_ptr<IO::AsyncEventServer>> async_servers_; \r
- std::vector<osc::server> osc_servers_;\r
+ std::shared_ptr<IO::AsyncEventServer> primary_amcp_server_;\r
+ std::vector<osc::client> osc_clients_;\r
std::vector<safe_ptr<video_channel>> channels_;\r
std::shared_ptr<thumbnail_generator> thumbnail_generator_;\r
\r
implementation(boost::promise<bool>& shutdown_server_now)\r
: shutdown_server_now_(shutdown_server_now)\r
, ogl_(ogl_device::create())\r
- { \r
+ {\r
+ monitor_subject_.link_target(&multi_target_);\r
setup_audio(env::properties());\r
\r
ffmpeg::init();\r
\r
setup_controllers(env::properties());\r
CASPAR_LOG(info) << L"Initialized controllers.";\r
+\r
+ setup_osc(env::properties());\r
+ CASPAR_LOG(info) << L"Initialized osc.";\r
}\r
\r
~implementation()\r
auto asyncbootstrapper = make_safe<IO::AsyncEventServer>(create_protocol(protocol), port);\r
asyncbootstrapper->Start();\r
async_servers_.push_back(asyncbootstrapper);\r
- }\r
- else if(name == L"udp")\r
- { \r
- const auto address = xml_controller.second.get(L"address", L"127.0.0.1");\r
- const auto port = xml_controller.second.get<unsigned short>(L"port", 6250);\r
\r
- osc_servers_.push_back(osc::server(boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::from_string(narrow(address)), port), monitor_subject_));\r
+ if (!primary_amcp_server_ && boost::iequals(protocol, L"AMCP"))\r
+ primary_amcp_server_ = asyncbootstrapper;\r
}\r
else\r
CASPAR_LOG(warning) << "Invalid controller: " << widen(name); \r
}\r
}\r
\r
+ void setup_osc(const boost::property_tree::wptree& pt)\r
+ { \r
+ using boost::property_tree::wptree;\r
+ using namespace boost::asio::ip;\r
+ \r
+ auto default_port =\r
+ pt.get<unsigned short>(L"configuration.osc.default-port", 6250);\r
+ auto predefined_clients =\r
+ pt.get_child_optional(L"configuration.osc.predefined-clients");\r
+\r
+ if (predefined_clients)\r
+ {\r
+ BOOST_FOREACH(auto& predefined_client, *predefined_clients)\r
+ {\r
+ const auto address =\r
+ predefined_client.second.get<std::wstring>(L"address");\r
+ const auto port =\r
+ predefined_client.second.get<unsigned short>(L"port");\r
+ osc_clients_.push_back(osc::client(\r
+ io_service_manager_.service(),\r
+ udp::endpoint(\r
+ address_v4::from_string(narrow(address)),\r
+ port),\r
+ multi_target_));\r
+ }\r
+ }\r
+\r
+ if (primary_amcp_server_)\r
+ primary_amcp_server_->add_lifecycle_factory(\r
+ [=] (const std::string& ipv4_address)\r
+ -> std::shared_ptr<void>\r
+ {\r
+ using namespace boost::asio::ip;\r
+\r
+ return std::make_shared<osc::client>(\r
+ io_service_manager_.service(),\r
+ udp::endpoint(\r
+ address_v4::from_string(ipv4_address),\r
+ default_port),\r
+ multi_target_);\r
+ });\r
+ }\r
+\r
void setup_thumbnail_generation(const boost::property_tree::wptree& pt)\r
{\r
if (!pt.get(L"configuration.thumbnails.generate-thumbnails", true))\r