X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=protocol%2Futil%2FAsyncEventServer.cpp;h=cdfe538345206bbf36b72a7f881c66b269bebd9c;hb=7f606328a16a14e6e1190a440caa2cb2e619127b;hp=9946c55c391debeb40a0f0a5b1cfcd615e41e7d6;hpb=1785286834c8c3e81e2aa2bc17248cfa6992c883;p=casparcg diff --git a/protocol/util/AsyncEventServer.cpp b/protocol/util/AsyncEventServer.cpp index 9946c55c3..cdfe53834 100644 --- a/protocol/util/AsyncEventServer.cpp +++ b/protocol/util/AsyncEventServer.cpp @@ -1,585 +1,370 @@ -/* -* Copyright (c) 2011 Sveriges Television AB -* -* This file is part of CasparCG (www.casparcg.com). -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -* Author: Nicklas P Andersson -*/ - - -// AsyncEventServer.cpp: impl of the AsyncEventServer class. -// -////////////////////////////////////////////////////////////////////// - -#include "../stdafx.h" - -#include "AsyncEventServer.h" -#include "SocketInfo.h" - -#include -#include -#include -#include - -#if defined(_MSC_VER) -#pragma warning (push, 1) // TODO: Legacy code, just disable warnings, will replace with boost::asio in future -#endif - -namespace caspar { namespace IO { - -#define CASPAR_MAXIMUM_SOCKET_CLIENTS (MAXIMUM_WAIT_OBJECTS-1) - -long AsyncEventServer::instanceCount_ = 0; -////////////////////////////// -// AsyncEventServer constructor -// PARAMS: port(TCP-port the server should listen to) -// COMMENT: Initializes the WinSock2 library -AsyncEventServer::AsyncEventServer(const spl::shared_ptr& pProtocol, int port) : port_(port), pProtocolStrategy_(pProtocol) -{ - if(instanceCount_ == 0) { - WSADATA wsaData; - if(WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR) - throw std::exception("Error initializing WinSock2"); - else { - CASPAR_LOG(info) << "WinSock2 Initialized."; - } - } - - InterlockedIncrement(&instanceCount_); -} - -///////////////////////////// -// AsyncEventServer destructor -AsyncEventServer::~AsyncEventServer() { - Stop(); - - InterlockedDecrement(&instanceCount_); - if(instanceCount_ == 0) - WSACleanup(); -} - -void AsyncEventServer::SetClientDisconnectHandler(ClientDisconnectEvent handler) { - socketInfoCollection_.onSocketInfoRemoved = handler; -} - -////////////////////////////// -// AsyncEventServer::Start -// RETURNS: true at successful startup -bool AsyncEventServer::Start() { - if(listenThread_.IsRunning()) - return false; - - socketInfoCollection_.Clear(); - - sockaddr_in sockAddr; - ZeroMemory(&sockAddr, sizeof(sockAddr)); - sockAddr.sin_family = AF_INET; - sockAddr.sin_addr.s_addr = INADDR_ANY; - sockAddr.sin_port = htons(port_); - - SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if(listenSocket == INVALID_SOCKET) { - CASPAR_LOG(error) << "Failed to create listenSocket"; - return false; - } - - pListenSocketInfo_ = SocketInfoPtr(new SocketInfo(listenSocket, this)); - - if(WSAEventSelect(pListenSocketInfo_->socket_, pListenSocketInfo_->event_, FD_ACCEPT|FD_CLOSE) == SOCKET_ERROR) { - CASPAR_LOG(error) << "Failed to enter EventSelect-mode for listenSocket"; - return false; - } - - if(bind(pListenSocketInfo_->socket_, (sockaddr*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) { - CASPAR_LOG(error) << "Failed to bind listenSocket"; - return false; - } - - if(listen(pListenSocketInfo_->socket_, SOMAXCONN) == SOCKET_ERROR) { - CASPAR_LOG(error) << "Failed to listen"; - return false; - } - - socketInfoCollection_.AddSocketInfo(pListenSocketInfo_); - - //start thread: the entrypoint is Run(EVENT stopEvent) - if(!listenThread_.Start(this)) { - CASPAR_LOG(error) << "Failed to create ListenThread"; - return false; - } - - CASPAR_LOG(info) << "Listener successfully initialized"; - return true; -} - -void AsyncEventServer::Run(HANDLE stopEvent) -{ - WSANETWORKEVENTS networkEvents; - - HANDLE waitHandlesCopy[MAXIMUM_WAIT_OBJECTS]; - waitHandlesCopy[0] = stopEvent; - - while(true) { - //Update local copy of the array of wait-handles if nessecery - if(socketInfoCollection_.IsDirty()) { - socketInfoCollection_.CopyCollectionToArray(&(waitHandlesCopy[1]), CASPAR_MAXIMUM_SOCKET_CLIENTS); - socketInfoCollection_.ClearDirty(); - } - - DWORD waitResult = WSAWaitForMultipleEvents(std::min(static_cast(socketInfoCollection_.Size()+1), MAXIMUM_WAIT_OBJECTS), waitHandlesCopy, FALSE, 1500, FALSE); - if(waitResult == WAIT_TIMEOUT) - continue; - else if(waitResult == WAIT_FAILED) - break; - else { - DWORD eventIndex = waitResult - WAIT_OBJECT_0; - - HANDLE waitEvent = waitHandlesCopy[eventIndex]; - SocketInfoPtr pSocketInfo; - - if(eventIndex == 0) //stopEvent - break; - else if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) { - WSAEnumNetworkEvents(pSocketInfo->socket_, waitEvent, &networkEvents); - - if(networkEvents.lNetworkEvents & FD_ACCEPT) { - if(networkEvents.iErrorCode[FD_ACCEPT_BIT] == 0) - OnAccept(pSocketInfo); - else { - CASPAR_LOG(debug) << "OnAccept (ErrorCode: " << networkEvents.iErrorCode[FD_ACCEPT_BIT] << TEXT(")"); - OnError(waitEvent, networkEvents.iErrorCode[FD_ACCEPT_BIT]); - } - } - - if(networkEvents.lNetworkEvents & FD_CLOSE) { - if(networkEvents.iErrorCode[FD_CLOSE_BIT] == 0) - OnClose(pSocketInfo); - else { - CASPAR_LOG(debug) << "OnClose (ErrorCode: " << networkEvents.iErrorCode[FD_CLOSE_BIT] << TEXT(")"); - OnError(waitEvent, networkEvents.iErrorCode[FD_CLOSE_BIT]); - } - continue; - } - - if(networkEvents.lNetworkEvents & FD_READ) { - if(networkEvents.iErrorCode[FD_READ_BIT] == 0) - OnRead(pSocketInfo); - else { - CASPAR_LOG(debug) << "OnRead (ErrorCode: " << networkEvents.iErrorCode[FD_READ_BIT] << TEXT(")"); - OnError(waitEvent, networkEvents.iErrorCode[FD_READ_BIT]); - } - } - - if(networkEvents.lNetworkEvents & FD_WRITE) { - if(networkEvents.iErrorCode[FD_WRITE_BIT] == 0) - OnWrite(pSocketInfo); - else { - CASPAR_LOG(debug) << "OnWrite (ErrorCode: " << networkEvents.iErrorCode[FD_WRITE_BIT] << TEXT(")"); - OnError(waitEvent, networkEvents.iErrorCode[FD_WRITE_BIT]); - } - } - } - else { - //Could not find the waitHandle in the SocketInfoCollection. - //It must have been removed during the last call to WSAWaitForMultipleEvents - } - } - } -} - -bool AsyncEventServer::OnUnhandledException(const std::exception& ex) throw() { - bool bDoRestart = true; - - try - { - CASPAR_LOG(fatal) << "UNHANDLED EXCEPTION in TCPServers listeningthread. Message: " << ex.what(); - } - catch(...) - { - bDoRestart = false; - } - - return bDoRestart; -} - -/////////////////////////////// -// AsyncEventServer:Stop -// COMMENT: Shuts down -void AsyncEventServer::Stop() -{ - //TODO: initiate shutdown on all clients connected -// for(int i=0; i < _totalActiveSockets; ++i) { -// shutdown(_pSocketInfo[i]->_socket, SD_SEND); -// } - - if(!listenThread_.Stop()) { - CASPAR_LOG(warning) << "Wait for listenThread timed out."; - } - - socketInfoCollection_.Clear(); -} - -//////////////////////////////////////////////////////////////////// -// -// MESSAGE HANDLERS -// -//////////////////////////////////////////////////////////////////// - - -////////////////////////////// -// AsyncEventServer::OnAccept -// PARAMS: ... -// COMMENT: Called when a new client connects -bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) { - sockaddr_in clientAddr; - int addrSize = sizeof(clientAddr); - SOCKET clientSocket = WSAAccept(pSI->socket_, (sockaddr*)&clientAddr, &addrSize, NULL, NULL); - if(clientSocket == INVALID_SOCKET) { - LogSocketError(TEXT("Accept")); - return false; - } - - SocketInfoPtr pClientSocket(new SocketInfo(clientSocket, this)); - - //Determine if we can handle one more client - if(socketInfoCollection_.Size() >= CASPAR_MAXIMUM_SOCKET_CLIENTS) { - CASPAR_LOG(error) << "Could not accept ) << too many connections)."; - return true; - } - - if(WSAEventSelect(pClientSocket->socket_, pClientSocket->event_, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) { - LogSocketError(TEXT("Accept (failed create event for new client)")); - return false; - } - - TCHAR addressBuffer[32]; - MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32); - pClientSocket->host_ = addressBuffer; - - socketInfoCollection_.AddSocketInfo(pClientSocket); - - CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str(); - - return true; -} - -bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector& wideBuffer, int& countLeftovers) -{ - if(codePage == CP_UTF8) { - countLeftovers = 0; - //check from the end of pSource for ev. uncompleted UTF-8 byte sequence - if(pSource[sourceLength-1] & 0x80) { - //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence - int bytesToCheck = std::min(4, sourceLength); //a sequence contains a maximum of 4 bytes - int currentLeftoverIndex = sourceLength-1; - for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) { - ++countLeftovers; - if(pSource[currentLeftoverIndex] & 0x80) { - if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header" - int expectedSequenceLength = 2; - if(pSource[currentLeftoverIndex] & 0x20) - ++expectedSequenceLength; - if(pSource[currentLeftoverIndex] & 0x10) - ++expectedSequenceLength; - - if(countLeftovers < expectedSequenceLength) { - //The sequence is incomplete. Leave the leftovers to be interpreted with the next call - break; - } - //The sequence is complete, there are no leftovers. - //...OR... - //error. Let the conversion-function take the hit. - countLeftovers = 0; - break; - } - } - else { - //error. Let the conversion-function take the hit. - countLeftovers = 0; - break; - } - } - if(countLeftovers == 4) { - //error. Let the conversion-function take the hit. - countLeftovers = 0; - } - } - } - - int charsWritten = 0; - int sourceBytesToProcess = sourceLength-countLeftovers; - int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL); - if(wideBufferCapacity > 0) - { - wideBuffer.resize(wideBufferCapacity); - charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, &wideBuffer[0], wideBuffer.size()); - } - //copy the leftovers to the front of the buffer - if(countLeftovers > 0) { - memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers); - } - - wideBuffer.resize(charsWritten); - return (charsWritten > 0); -} - -////////////////////////////// -// AsyncEventServer::OnRead -// PARAMS: ... -// COMMENT: Called then something arrives on the socket that has to be read -bool AsyncEventServer::OnRead(SocketInfoPtr& pSI) { - int recvResult = SOCKET_ERROR; - - int maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_; - recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0); - while(recvResult != SOCKET_ERROR) { - if(recvResult == 0) { - CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" disconnected"); - - socketInfoCollection_.RemoveSocketInfo(pSI); - return true; - } - - //Convert to widechar - if(ConvertMultiByteToWideChar(pProtocolStrategy_->GetCodepage(), pSI->recvBuffer_, recvResult + pSI->recvLeftoverOffset_, pSI->wideRecvBuffer_, pSI->recvLeftoverOffset_)) - pProtocolStrategy_->Parse(&pSI->wideRecvBuffer_[0], pSI->wideRecvBuffer_.size(), pSI); - else - CASPAR_LOG(error) << "Read from " << pSI->host_.c_str() << TEXT(" failed, could not convert command to UNICODE"); - - maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_; - recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0); - } - - if(recvResult == SOCKET_ERROR) { - int errorCode = WSAGetLastError(); - if(errorCode == WSAEWOULDBLOCK) - return true; - else { - LogSocketError(TEXT("Read"), errorCode); - OnError(pSI->event_, errorCode); - } - } - - return false; -} - -////////////////////////////// -// AsyncEventServer::OnWrite -// PARAMS: ... -// COMMENT: Called when the socket is ready to send more data -void AsyncEventServer::OnWrite(SocketInfoPtr& pSI) { - DoSend(*pSI); -} - -bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector& destBuffer) -{ - int bytesWritten = 0; - int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast(wideString.length()), 0, 0, NULL, NULL); - if(multibyteBufferCapacity > 0) - { - destBuffer.resize(multibyteBufferCapacity); - bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast(wideString.length()), &destBuffer[0], destBuffer.size(), NULL, NULL); - } - destBuffer.resize(bytesWritten); - return (bytesWritten > 0); -} - -void AsyncEventServer::DoSend(SocketInfo& socketInfo) { - //Locks the socketInfo-object so that no one else tampers with the sendqueue at the same time - tbb::mutex::scoped_lock lock(mutex_); - - while(!socketInfo.sendQueue_.empty() || socketInfo.currentlySending_.size() > 0) { - if(socketInfo.currentlySending_.size() == 0) { - //Read the next string in the queue and convert to UTF-8 - if(!ConvertWideCharToMultiByte(pProtocolStrategy_->GetCodepage(), socketInfo.sendQueue_.front(), socketInfo.currentlySending_)) - { - CASPAR_LOG(error) << "Send to " << socketInfo.host_.c_str() << TEXT(" failed, could not convert response to UTF-8"); - } - socketInfo.currentlySendingOffset_ = 0; - } - - if(socketInfo.currentlySending_.size() > 0) { - int bytesToSend = static_cast(socketInfo.currentlySending_.size()-socketInfo.currentlySendingOffset_); - int sentBytes = send(socketInfo.socket_, &socketInfo.currentlySending_[0] + socketInfo.currentlySendingOffset_, bytesToSend, 0); - if(sentBytes == SOCKET_ERROR) { - int errorCode = WSAGetLastError(); - if(errorCode == WSAEWOULDBLOCK) { - CASPAR_LOG(debug) << "Send to " << socketInfo.host_.c_str() << TEXT(" would block, sending later"); - break; - } - else { - LogSocketError(TEXT("Send"), errorCode); - OnError(socketInfo.event_, errorCode); - - socketInfo.currentlySending_.resize(0); - socketInfo.currentlySendingOffset_ = 0; - socketInfo.sendQueue_.pop(); - break; - } - } - else { - if(sentBytes == bytesToSend) { - - if(sentBytes < 512) - { - boost::replace_all(socketInfo.sendQueue_.front(), L"\n", L"\\n"); - boost::replace_all(socketInfo.sendQueue_.front(), L"\r", L"\\r"); - CASPAR_LOG(info) << L"Sent message to " << socketInfo.host_.c_str() << L": " << socketInfo.sendQueue_.front().c_str(); - } - else - CASPAR_LOG(info) << "Sent more than 512 bytes to " << socketInfo.host_.c_str(); - - socketInfo.currentlySending_.resize(0); - socketInfo.currentlySendingOffset_ = 0; - socketInfo.sendQueue_.pop(); - } - else { - socketInfo.currentlySendingOffset_ += sentBytes; - CASPAR_LOG(info) << "Sent partial message to " << socketInfo.host_.c_str(); - } - } - } - else - socketInfo.sendQueue_.pop(); - } -} - -////////////////////////////// -// AsyncEventServer::OnClose -// PARAMS: ... -// COMMENT: Called when a client disconnects / is disconnected -void AsyncEventServer::OnClose(SocketInfoPtr& pSI) { - CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" was disconnected"); - - socketInfoCollection_.RemoveSocketInfo(pSI); -} - -////////////////////////////// -// AsyncEventServer::OnError -// PARAMS: ... -// COMMENT: Called when an errorcode is recieved -void AsyncEventServer::OnError(HANDLE waitEvent, int errorCode) { - if(errorCode == WSAENETDOWN || errorCode == WSAECONNABORTED || errorCode == WSAECONNRESET || errorCode == WSAESHUTDOWN || errorCode == WSAETIMEDOUT || errorCode == WSAENOTCONN || errorCode == WSAENETRESET) { - SocketInfoPtr pSocketInfo; - if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) { - CASPAR_LOG(info) << "Client " << pSocketInfo->host_.c_str() << TEXT(" was disconnected, Errorcode ") << errorCode; - } - - socketInfoCollection_.RemoveSocketInfo(waitEvent); - } -} - -////////////////////////////// -// AsyncEventServer::DisconnectClient -// PARAMS: ... -// COMMENT: The client is removed from the actual client-list when an FD_CLOSE notification is recieved -void AsyncEventServer::DisconnectClient(SocketInfo& socketInfo) { - int result = shutdown(socketInfo.socket_, SD_SEND); - if(result == SOCKET_ERROR) - OnError(socketInfo.event_, result); -} - -////////////////////////////// -// AsyncEventServer::LogSocketError -void AsyncEventServer::LogSocketError(const TCHAR* pStr, int socketError) { - if(socketError == 0) - socketError = WSAGetLastError(); - - CASPAR_LOG(error) << "Failed to " << pStr << TEXT(" Errorcode: ") << socketError; -} - - -////////////////////////////// -// SocketInfoCollection -////////////////////////////// - -AsyncEventServer::SocketInfoCollection::SocketInfoCollection() : bDirty_(false) { -} - -AsyncEventServer::SocketInfoCollection::~SocketInfoCollection() { -} - -bool AsyncEventServer::SocketInfoCollection::AddSocketInfo(SocketInfoPtr& pSocketInfo) { - tbb::mutex::scoped_lock lock(mutex_); - - waitEvents_.resize(waitEvents_.size()+1); - bool bSuccess = socketInfoMap_.insert(SocketInfoMap::value_type(pSocketInfo->event_, pSocketInfo)).second; - if(bSuccess) { - waitEvents_[waitEvents_.size()-1] = pSocketInfo->event_; - bDirty_ = true; - } - - return bSuccess; -} - -void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(SocketInfoPtr& pSocketInfo) { - if(pSocketInfo != 0) { - RemoveSocketInfo(pSocketInfo->event_); - } -} -void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(HANDLE waitEvent) { - tbb::mutex::scoped_lock lock(mutex_); - - //Find instance - SocketInfoPtr pSocketInfo; - SocketInfoMap::iterator it = socketInfoMap_.find(waitEvent); - SocketInfoMap::iterator end = socketInfoMap_.end(); - if(it != end) - pSocketInfo = it->second; - - if(pSocketInfo) { - pSocketInfo->pServer_ = NULL; - - socketInfoMap_.erase(waitEvent); - - HandleVector::iterator it = std::find(waitEvents_.begin(), waitEvents_.end(), waitEvent); - if(it != waitEvents_.end()) { - std::swap((*it), waitEvents_.back()); - waitEvents_.resize(waitEvents_.size()-1); - - bDirty_ = true; - } - } - if(onSocketInfoRemoved) - onSocketInfoRemoved(pSocketInfo); -} - -bool AsyncEventServer::SocketInfoCollection::FindSocketInfo(HANDLE key, SocketInfoPtr& pResult) { - tbb::mutex::scoped_lock lock(mutex_); - - SocketInfoMap::iterator it = socketInfoMap_.find(key); - SocketInfoMap::iterator end = socketInfoMap_.end(); - if(it != end) - pResult = it->second; - - return (it != end); -} - -void AsyncEventServer::SocketInfoCollection::CopyCollectionToArray(HANDLE* pDest, int maxCount) { - tbb::mutex::scoped_lock lock(mutex_); - - memcpy(pDest, &(waitEvents_[0]), std::min( maxCount, static_cast(waitEvents_.size()) ) * sizeof(HANDLE) ); -} - -void AsyncEventServer::SocketInfoCollection::Clear() { - tbb::mutex::scoped_lock lock(mutex_); - - socketInfoMap_.clear(); - waitEvents_.clear(); -} - -} //namespace IO -} //namespace caspar \ No newline at end of file +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Robert Nagy, ronag89@gmail.com +*/ + +#include "../StdAfx.h" + +#include "AsyncEventServer.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +using boost::asio::ip::tcp; + +namespace caspar { namespace IO { + +class connection; + +typedef std::set> connection_set; + +class connection : public spl::enable_shared_from_this +{ + typedef tbb::concurrent_hash_map> lifecycle_map_type; + typedef tbb::concurrent_queue send_queue; + + const spl::shared_ptr socket_; + std::shared_ptr service_; + const std::wstring listen_port_; + const spl::shared_ptr connection_set_; + protocol_strategy_factory::ptr protocol_factory_; + std::shared_ptr> protocol_; + + std::array data_; + lifecycle_map_type lifecycle_bound_objects_; + send_queue send_queue_; + bool is_writing_; + + class connection_holder : public client_connection + { + std::weak_ptr connection_; + public: + explicit connection_holder(std::weak_ptr conn) : connection_(std::move(conn)) + {} + + void send(std::basic_string&& data) override + { + auto conn = connection_.lock(); + + if (conn) + conn->send(std::move(data)); + } + + void disconnect() override + { + auto conn = connection_.lock(); + + if (conn) + conn->disconnect(); + } + + std::wstring address() const override + { + auto conn = connection_.lock(); + + if (conn) + return conn->ipv4_address(); + else + return L"[destroyed-connection]"; + } + + void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr& lifecycle_bound) override + { + auto conn = connection_.lock(); + + if (conn) + return conn->add_lifecycle_bound_object(key, lifecycle_bound); + } + + std::shared_ptr remove_lifecycle_bound_object(const std::wstring& key) override + { + auto conn = connection_.lock(); + + if (conn) + return conn->remove_lifecycle_bound_object(key); + else + return std::shared_ptr(); + } + }; + +public: + static spl::shared_ptr create(std::shared_ptr service, spl::shared_ptr socket, const protocol_strategy_factory::ptr& protocol, spl::shared_ptr connection_set) + { + spl::shared_ptr con(new connection(std::move(service), std::move(socket), std::move(protocol), std::move(connection_set))); + con->init(); + con->read_some(); + return con; + } + + void init() + { + protocol_ = protocol_factory_->create(spl::make_shared(shared_from_this())); + } + + ~connection() + { + CASPAR_LOG(debug) << print() << L" connection destroyed."; + } + + std::wstring print() const + { + return L"async_event_server[:" + listen_port_ + L"]"; + } + + std::wstring address() const + { + return u16(socket_->local_endpoint().address().to_string()); + } + + std::wstring ipv4_address() const + { + return socket_->is_open() ? u16(socket_->remote_endpoint().address().to_string()) : L"no-address"; + } + + void send(std::string&& data) + { + send_queue_.push(std::move(data)); + auto self = shared_from_this(); + service_->dispatch([=] { self->do_write(); }); + } + + void disconnect() + { + auto self = shared_from_this(); + service_->dispatch([=] { self->stop(); }); + } + + void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr& lifecycle_bound) + { + //thread-safe tbb_concurrent_hash_map + lifecycle_bound_objects_.insert(std::pair>(key, lifecycle_bound)); + } + std::shared_ptr remove_lifecycle_bound_object(const std::wstring& key) + { + //thread-safe tbb_concurrent_hash_map + lifecycle_map_type::const_accessor acc; + if(lifecycle_bound_objects_.find(acc, key)) + { + auto result = acc->second; + lifecycle_bound_objects_.erase(acc); + return result; + } + return std::shared_ptr(); + } + +private: + void do_write() //always called from the asio-service-thread + { + if(!is_writing_) + { + std::string data; + if(send_queue_.try_pop(data)) + { + write_some(std::move(data)); + } + } + } + + void stop() //always called from the asio-service-thread + { + connection_set_->erase(shared_from_this()); + + CASPAR_LOG(info) << print() << L" Client " << ipv4_address() << L" disconnected (" << connection_set_->size() << L" connections)."; + + boost::system::error_code ec; + socket_->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec); + socket_->close(ec); + } + + connection(const std::shared_ptr& service, const spl::shared_ptr& socket, const protocol_strategy_factory::ptr& protocol_factory, const spl::shared_ptr& connection_set) + : socket_(socket) + , service_(service) + , listen_port_(socket_->is_open() ? boost::lexical_cast(socket_->local_endpoint().port()) : L"no-port") + , connection_set_(connection_set) + , protocol_factory_(protocol_factory) + , is_writing_(false) + { + CASPAR_LOG(info) << print() << L" Accepted connection from " << ipv4_address() << L" (" << (connection_set_->size() + 1) << L" connections)."; + } + + void handle_read(const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread + { + if(!error) + { + try + { + std::string data(data_.begin(), data_.begin() + bytes_transferred); + + protocol_->parse(data); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + + read_some(); + } + else if (error != boost::asio::error::operation_aborted) + stop(); + } + + void handle_write(const spl::shared_ptr& str, const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread + { + if(!error) + { + if(bytes_transferred != str->size()) + { + str->assign(str->substr(bytes_transferred)); + socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2)); + } + else + { + is_writing_ = false; + do_write(); + } + } + else if (error != boost::asio::error::operation_aborted && socket_->is_open()) + stop(); + } + + void read_some() //always called from the asio-service-thread + { + socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } + + void write_some(std::string&& data) //always called from the asio-service-thread + { + is_writing_ = true; + auto str = spl::make_shared(std::move(data)); + socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2)); + } + + friend struct AsyncEventServer::implementation; +}; + +struct AsyncEventServer::implementation : public spl::enable_shared_from_this +{ + std::shared_ptr service_; + tcp::acceptor acceptor_; + protocol_strategy_factory::ptr protocol_factory_; + spl::shared_ptr connection_set_; + std::vector lifecycle_factories_; + tbb::mutex mutex_; + + implementation(std::shared_ptr service, const protocol_strategy_factory::ptr& protocol, unsigned short port) + : service_(std::move(service)) + , acceptor_(*service_, tcp::endpoint(tcp::v4(), port)) + , protocol_factory_(protocol) + { + } + + void stop() + { + try + { + acceptor_.cancel(); + acceptor_.close(); + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + } + + ~implementation() + { + auto conns_set = connection_set_; + + service_->post([conns_set] + { + auto connections = *conns_set; + for (auto& connection : connections) + connection->stop(); + }); + } + + void start_accept() + { + spl::shared_ptr socket(new tcp::socket(*service_)); + acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, shared_from_this(), socket, std::placeholders::_1)); + } + + void handle_accept(const spl::shared_ptr& socket, const boost::system::error_code& error) + { + if (!acceptor_.is_open()) + return; + + if (!error) + { + boost::system::error_code ec; + socket->set_option(boost::asio::socket_base::keep_alive(true), ec); + + if (ec) + CASPAR_LOG(warning) << print() << L" Failed to enable TCP keep-alive on socket"; + + auto conn = connection::create(service_, socket, protocol_factory_, connection_set_); + connection_set_->insert(conn); + + for (auto& lifecycle_factory : lifecycle_factories_) + { + auto lifecycle_bound = lifecycle_factory(u8(conn->ipv4_address())); + conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second); + } + } + start_accept(); + } + + std::wstring print() const + { + return L"async_event_server[:" + boost::lexical_cast(acceptor_.local_endpoint().port()) + L"]"; + } + + void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) + { + auto self = shared_from_this(); + service_->post([=]{ self->lifecycle_factories_.push_back(factory); }); + } +}; + +AsyncEventServer::AsyncEventServer( + std::shared_ptr service, const protocol_strategy_factory::ptr& protocol, unsigned short port) + : impl_(new implementation(std::move(service), protocol, port)) +{ + impl_->start_accept(); +} + +AsyncEventServer::~AsyncEventServer() +{ + impl_->stop(); +} + +void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); } + +}}