typename std::enable_if<!std::is_convertible<T, std::wstring>::value, typename std::decay<T>::type>::type get_param(const std::wstring& name, C&& params, T fail_value = T())
{
auto it = std::find_if(std::begin(params), std::end(params), param_comparer(name));
- //auto it = std::find(std::begin(params), std::end(params), name);
if(it == params.end())
return fail_value;
std::wstring get_param(const std::wstring& name, C&& params, const std::wstring& fail_value = L"")
{
auto it = std::find_if(std::begin(params), std::end(params), param_comparer(name));
- //auto it = std::find(std::begin(params), std::end(params), name);
if(it == params.end())
return fail_value;
AMCPProtocolStrategy::~AMCPProtocolStrategy() {
}
-void AMCPProtocolStrategy::Parse(const TCHAR* pData, int charCount, ClientInfoPtr pClientInfo)
+//The paser method expects message to be complete messages with the delimiter stripped away.
+//Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
+void AMCPProtocolStrategy::Parse(const std::wstring& message, ClientInfoPtr pClientInfo)
{
- size_t pos;
- std::wstring recvData(pData, charCount);
- std::wstring availibleData = (pClientInfo != nullptr ? pClientInfo->currentMessage_ : L"") + recvData;
-
- while(true) {
- pos = availibleData.find(MessageDelimiter);
- if(pos != std::wstring::npos)
- {
- std::wstring message = availibleData.substr(0,pos);
-
- //This is where a complete message gets taken care of
- if(message.length() > 0) {
- ProcessMessage(message, pClientInfo);
- }
-
- std::size_t nextStartPos = pos + MessageDelimiter.length();
- if(nextStartPos < availibleData.length())
- availibleData = availibleData.substr(nextStartPos);
- else {
- availibleData.clear();
- break;
- }
- }
- else
- {
- break;
- }
- }
- if(pClientInfo)
- pClientInfo->currentMessage_ = availibleData;
-}
-
-void AMCPProtocolStrategy::ProcessMessage(const std::wstring& message, ClientInfoPtr& pClientInfo)
-{
CASPAR_LOG(info) << L"Received message from " << pClientInfo->print() << ": " << message << L"\\r\\n";
bool bError = true;
AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels);
virtual ~AMCPProtocolStrategy();
- virtual void Parse(const TCHAR* pData, int charCount, IO::ClientInfoPtr pClientInfo);
- virtual std::string GetCodepage() {
- return "UTF-8";
- }
-
+ virtual void Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo);
+ virtual std::string GetCodepage() { return "UTF-8"; }
AMCPCommand::ptr_type InterpretCommandString(const std::wstring& str, MessageParserState* pOutState=0);
private:
friend class AMCPCommand;
- void ProcessMessage(const std::wstring& message, IO::ClientInfoPtr& pClientInfo);
std::size_t TokenizeMessage(const std::wstring& message, std::vector<std::wstring>* pTokenVector);
AMCPCommand::ptr_type CommandFactory(const std::wstring& str);
{
}
-void CIIProtocolStrategy::Parse(const TCHAR* pData, int charCount, IO::ClientInfoPtr pClientInfo)
+//The paser method expects message to be complete messages with the delimiter stripped away.
+//Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
+void CIIProtocolStrategy::Parse(const std::wstring& message, IO::ClientInfoPtr pClientInfo)
{
- std::size_t pos;
- std::wstring msg(pData, charCount);
- std::wstring availibleData = currentMessage_ + msg;
-
- while(true)
+ if(message.length() > 0)
{
- pos = availibleData.find(MessageDelimiter);
- if(pos != std::wstring::npos)
- {
- std::wstring message = availibleData.substr(0,pos);
-
- if(message.length() > 0) {
- ProcessMessage(message, pClientInfo);
- if(pClientInfo != 0)
- pClientInfo->Send(TEXT("*\r\n"));
- }
-
- std::size_t nextStartPos = pos + MessageDelimiter.length();
- if(nextStartPos < availibleData.length())
- availibleData = availibleData.substr(nextStartPos);
- else
- {
- availibleData.clear();
- break;
- }
- }
- else
- break;
+ ProcessMessage(message, pClientInfo);
+ if(pClientInfo != 0)
+ pClientInfo->Send(TEXT("*\r\n"));
}
- currentMessage_ = availibleData;
}
void CIIProtocolStrategy::ProcessMessage(const std::wstring& message, IO::ClientInfoPtr pClientInfo)
public:
CIIProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels);
- void Parse(const TCHAR* pData, int charCount, IO::ClientInfoPtr pClientInfo);
+ void Parse(const std::wstring& message, IO::ClientInfoPtr pClientInfo);
std::string GetCodepage() {return "ISO-8859-1";} //ISO 8859-1
void SetProfile(const std::wstring& profile) {currentProfile_ = profile;}
<ClInclude Include="StdAfx.h" />\r
<ClInclude Include="util\AsyncEventServer.h" />\r
<ClInclude Include="util\ClientInfo.h" />\r
+ <ClInclude Include="util\lock_container.h" />\r
<ClInclude Include="util\ProtocolStrategy.h" />\r
<ClInclude Include="util\protocol_strategy.h" />\r
<ClInclude Include="util\strategy_adapters.h" />\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
</ClCompile>\r
+ <ClCompile Include="util\lock_container.cpp">\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">..\StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">..\StdAfx.h</PrecompiledHeaderFile>\r
+ </ClCompile>\r
<ClCompile Include="util\strategy_adapters.cpp">\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<ClInclude Include="util\strategy_adapters.h">\r
<Filter>source\util</Filter>\r
</ClInclude>\r
+ <ClInclude Include="util\lock_container.h">\r
+ <Filter>source\util</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
<ItemGroup>\r
<ClCompile Include="amcp\AMCPCommandQueue.cpp">\r
<ClCompile Include="util\strategy_adapters.cpp">\r
<Filter>source\util</Filter>\r
</ClCompile>\r
+ <ClCompile Include="util\lock_container.cpp">\r
+ <Filter>source\util</Filter>\r
+ </ClCompile>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
+#include <tbb\mutex.h>
+
using boost::asio::ip::tcp;
namespace caspar { namespace IO {
typedef std::set<spl::shared_ptr<connection>> connection_set;
-class connection : public spl::enable_shared_from_this<connection>, public client_connection<char>
-{
+class connection : public spl::enable_shared_from_this<connection>
+{
const spl::shared_ptr<tcp::socket> socket_;
const spl::shared_ptr<connection_set> connection_set_;
const std::wstring name_;
std::shared_ptr<protocol_strategy<char>> protocol_;
std::array<char, 32768> data_;
+ std::vector<std::shared_ptr<void>> lifecycle_bound_items_;
+
+ class connection_holder : public client_connection<char>
+ {
+ std::weak_ptr<connection> connection_;
+ public:
+ explicit connection_holder(std::weak_ptr<connection> conn) : connection_(conn)
+ {}
+
+ virtual void send(std::basic_string<char>&& data)
+ {
+ auto conn = connection_.lock();
+ conn->send(std::move(data));
+ }
+ virtual void disconnect()
+ {
+ auto conn = connection_.lock();
+ conn->disconnect();
+ }
+ virtual std::wstring print() const
+ {
+ auto conn = connection_.lock();
+ return conn->print();
+ }
+
+ virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
+ {
+ auto conn = connection_.lock();
+ return conn->bind_to_lifecycle(lifecycle_bound);
+ }
+ };
public:
static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
return con;
}
+ ~connection()
+ {
+ CASPAR_LOG(info) << print() << L" connection destroyed.";
+ }
+
std::wstring print() const
{
return L"[" + name_ + L"]";
}
+
+ const std::string ipv4_address() const
+ {
+ return socket_->is_open() ? socket_->local_endpoint().address().to_string() : "no-address";
+ }
/* ClientInfo */
{
stop();
}
-
+ void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
+ {
+ lifecycle_bound_items_.push_back(lifecycle_bound);
+ }
+
/**************/
void stop()
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+
CASPAR_LOG(info) << print() << L" Disconnected.";
}
private:
- connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol, const spl::shared_ptr<connection_set>& connection_set)
+ connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
: socket_(socket)
, name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
, connection_set_(connection_set)
- , protocol_factory_(protocol)
+ , protocol_factory_(protocol_factory)
{
CASPAR_LOG(info) << print() << L" Connected.";
}
protocol_strategy<char>& protocol()
{
if (!protocol_)
- protocol_ = protocol_factory_->create(shared_from_this());
+ protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
return *protocol_;
}
{
boost::asio::io_service service_;
tcp::acceptor acceptor_;
- protocol_strategy_factory<char>::ptr protocol_;
+ protocol_strategy_factory<char>::ptr protocol_factory_;
spl::shared_ptr<connection_set> connection_set_;
boost::thread thread_;
+ std::vector<lifecycle_factory_t> lifecycle_factories_;
+ tbb::mutex mutex_;
implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
: acceptor_(service_, tcp::endpoint(tcp::v4(), port))
- , protocol_(protocol)
+ , protocol_factory_(protocol)
, thread_(std::bind(&boost::asio::io_service::run, &service_))
{
start_accept();
if (!acceptor_.is_open())
return;
- if (!error)
- connection_set_->insert(connection::create(socket, protocol_, connection_set_));
+ if (!error)
+ {
+ auto conn = connection::create(socket, protocol_factory_, connection_set_);
+ connection_set_->insert(conn);
+ {
+ tbb::mutex::scoped_lock lock(mutex_);
+
+ BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)
+ {
+ auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
+ conn->bind_to_lifecycle(lifecycle_bound);
+ }
+ }
+ }
start_accept();
}
+
+ void add_client_lifecycle_event_factory(const lifecycle_factory_t& factory)
+ {
+ tbb::mutex::scoped_lock lock(mutex_);
+ lifecycle_factories_.push_back(factory);
+ }
};
AsyncEventServer::AsyncEventServer(
const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
- : impl_(new implementation(protocol, port))
-{
-}
+ : impl_(new implementation(protocol, port)) {}
-AsyncEventServer::~AsyncEventServer()
-{
-}
+AsyncEventServer::~AsyncEventServer() {}
+void AsyncEventServer::add_client_lifecycle_event_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_event_factory(factory); }
}}
\ No newline at end of file
namespace caspar { namespace IO {
+ typedef std::function<std::shared_ptr<void> (const std::string& ipv4_address)>
+ lifecycle_factory_t;
+
class AsyncEventServer
{
public:
explicit AsyncEventServer(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port);
~AsyncEventServer();
+
+ void add_client_lifecycle_event_factory(const lifecycle_factory_t& lifecycle_factory);
+
private:
struct implementation;
std::unique_ptr<implementation> impl_;
virtual void Send(const std::wstring& data) = 0;
virtual void Disconnect() = 0;
virtual std::wstring print() const = 0;
-
- std::wstring currentMessage_;
+ virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound) = 0;
};
+
typedef std::shared_ptr<ClientInfo> ClientInfoPtr;
struct ConsoleClientInfo : public caspar::IO::ClientInfo
}
void Disconnect(){}
virtual std::wstring print() const {return L"Console";}
+ virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound) {}
};
}}
public:
virtual ~IProtocolStrategy(){}
- virtual void Parse(const wchar_t* pData, int charCount, ClientInfoPtr pClientInfo) = 0;
+ virtual void Parse(const std::wstring& msg, ClientInfoPtr pClientInfo) = 0;
virtual std::string GetCodepage() = 0;
+
+ virtual void on_client_disconnect(IO::ClientInfoPtr pClientInfo) {}
};
typedef std::shared_ptr<IProtocolStrategy> ProtocolStrategyPtr;
virtual void send(std::basic_string<CharT>&& data) = 0;\r
virtual void disconnect() = 0;\r
virtual std::wstring print() const = 0;\r
+\r
+ virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound) = 0;\r
};\r
\r
/**\r
: client_(client)\r
, codepage_(codepage)\r
{\r
+ CASPAR_LOG(info) << "from_unicode_client_connection created.";\r
+ }\r
+ ~from_unicode_client_connection()\r
+ {\r
+ CASPAR_LOG(info) << "from_unicode_client_connection destroyed.";\r
}\r
\r
virtual void send(std::basic_string<wchar_t>&& data)\r
{\r
return client_->print();\r
}\r
+\r
+ virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)\r
+ {\r
+ client_->bind_to_lifecycle(lifecycle_bound);\r
+ }\r
};\r
\r
to_unicode_adapter_factory::to_unicode_adapter_factory(\r
legacy_client_info(const client_connection<wchar_t>::ptr& client_connection)\r
: client_connection_(client_connection)\r
{\r
+ CASPAR_LOG(info) << "legacy_client_info created.";\r
+ }\r
+\r
+ ~legacy_client_info()\r
+ {\r
+ CASPAR_LOG(info) << "legacy_client_info destroyed.";\r
}\r
\r
+\r
virtual void Disconnect()\r
{\r
client_connection_->disconnect();\r
{\r
return client_connection_->print();\r
}\r
+\r
+ virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)\r
+ {\r
+ client_connection_->bind_to_lifecycle(lifecycle_bound);\r
+ }\r
};\r
\r
class legacy_strategy_adapter : public protocol_strategy<wchar_t>\r
: strategy_(strategy)\r
, client_info_(std::make_shared<legacy_client_info>(client_connection))\r
{\r
+ CASPAR_LOG(info) << "legacy_strategy_adapter created.";\r
+ }\r
+ ~legacy_strategy_adapter()\r
+ {\r
+ CASPAR_LOG(info) << "legacy_strategy_adapter destroyed.";\r
}\r
\r
virtual void parse(const std::basic_string<wchar_t>& data)\r
{\r
- auto p = data.c_str();\r
- strategy_->Parse(p, static_cast<int>(data.length()), client_info_);\r
+ strategy_->Parse(data, client_info_);\r
}\r
};\r
\r
input_ += data;\r
\r
std::vector<std::basic_string<CharT>> split;\r
- boost::iter_split(split, input_, boost::algorithm::first_finder(delimiter_));\r
+ boost::iter_split(split, input_, boost::algorithm::first_finder(delimiter_)); //TODO: check if this splits on all instances of delimiter_ in the input_\r
\r
input_ = std::move(split.back());\r
split.pop_back();\r
BOOST_FOREACH(auto cmd, split)\r
{\r
// TODO: perhaps it would be better to not append the delimiter.\r
- strategy_->parse(cmd + delimiter_);\r
+ strategy_->parse(cmd);\r
}\r
}\r
};\r
}
wcmd += L"\r\n";
- amcp.Parse(wcmd.c_str(), static_cast<int>(wcmd.length()), console_client);
+ amcp.Parse(wcmd, console_client);
}
CASPAR_LOG(info) << "Successfully shutdown CasparCG Server.";
}
unsigned int port = xml_controller.second.get(L"port", 5250);
auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(create_protocol(protocol), port);
async_servers_.push_back(asyncbootstrapper);
+
+ //TODO: remove - test
+ asyncbootstrapper->add_client_lifecycle_event_factory([=] (const std::string& ipv4_address) {
+ return std::shared_ptr<void>(nullptr, [] (void*)
+ { CASPAR_LOG(info) << "Client disconnect (lifecycle)"; });
+ });
}
else
CASPAR_LOG(warning) << "Invalid controller: " << name;