-/*\r
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG (www.casparcg.com).\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\r
-* GNU General Public License for more details.\r
-*\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-* Author: Nicklas P Andersson\r
-*/\r
-\r
- \r
-// AsyncEventServer.cpp: impl of the AsyncEventServer class.\r
-//\r
-//////////////////////////////////////////////////////////////////////\r
-\r
-#include "../stdafx.h"\r
-\r
-#include "AsyncEventServer.h"\r
-#include "SocketInfo.h"\r
-\r
-#include <common/log/log.h>\r
-\r
-#include <string>\r
-#include <algorithm>\r
-\r
-#if defined(_MSC_VER)\r
-#pragma warning (push, 1) // TODO: Legacy code, just disable warnings, will replace with boost::asio in future\r
-#endif\r
-\r
-namespace caspar { namespace IO {\r
- \r
-#define CASPAR_MAXIMUM_SOCKET_CLIENTS (MAXIMUM_WAIT_OBJECTS-1) \r
-\r
-long AsyncEventServer::instanceCount_ = 0;\r
-//////////////////////////////\r
-// AsyncEventServer constructor\r
-// PARAMS: port(TCP-port the server should listen to)\r
-// COMMENT: Initializes the WinSock2 library\r
-AsyncEventServer::AsyncEventServer(const safe_ptr<IProtocolStrategy>& pProtocol, int port) : port_(port), pProtocolStrategy_(pProtocol)\r
-{\r
- if(instanceCount_ == 0) {\r
- WSADATA wsaData;\r
- if(WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR)\r
- throw std::exception("Error initializing WinSock2");\r
- else {\r
- CASPAR_LOG(info) << "WinSock2 Initialized.";\r
- }\r
- }\r
-\r
- InterlockedIncrement(&instanceCount_);\r
-}\r
-\r
-/////////////////////////////\r
-// AsyncEventServer destructor\r
-AsyncEventServer::~AsyncEventServer() {\r
- Stop();\r
-\r
- InterlockedDecrement(&instanceCount_);\r
- if(instanceCount_ == 0)\r
- WSACleanup();\r
-}\r
-\r
-void AsyncEventServer::SetClientDisconnectHandler(ClientDisconnectEvent handler) {\r
- socketInfoCollection_.onSocketInfoRemoved = handler;\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::Start\r
-// RETURNS: true at successful startup\r
-bool AsyncEventServer::Start() {\r
- if(listenThread_.IsRunning())\r
- return false;\r
-\r
- socketInfoCollection_.Clear();\r
-\r
- sockaddr_in sockAddr;\r
- ZeroMemory(&sockAddr, sizeof(sockAddr));\r
- sockAddr.sin_family = AF_INET;\r
- sockAddr.sin_addr.s_addr = INADDR_ANY;\r
- sockAddr.sin_port = htons(port_);\r
- \r
- SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);\r
- if(listenSocket == INVALID_SOCKET) {\r
- CASPAR_LOG(error) << "Failed to create listenSocket";\r
- return false;\r
- }\r
- \r
- pListenSocketInfo_ = SocketInfoPtr(new SocketInfo(listenSocket, this));\r
-\r
- if(WSAEventSelect(pListenSocketInfo_->socket_, pListenSocketInfo_->event_, FD_ACCEPT|FD_CLOSE) == SOCKET_ERROR) {\r
- CASPAR_LOG(error) << "Failed to enter EventSelect-mode for listenSocket";\r
- return false;\r
- }\r
-\r
- if(bind(pListenSocketInfo_->socket_, (sockaddr*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) {\r
- CASPAR_LOG(error) << "Failed to bind listenSocket";\r
- return false;\r
- }\r
-\r
- if(listen(pListenSocketInfo_->socket_, SOMAXCONN) == SOCKET_ERROR) {\r
- CASPAR_LOG(error) << "Failed to listen";\r
- return false;\r
- }\r
-\r
- socketInfoCollection_.AddSocketInfo(pListenSocketInfo_);\r
-\r
- //start thread: the entrypoint is Run(EVENT stopEvent)\r
- if(!listenThread_.Start(this)) {\r
- CASPAR_LOG(error) << "Failed to create ListenThread";\r
- return false;\r
- }\r
-\r
- CASPAR_LOG(info) << "Listener successfully initialized";\r
- return true;\r
-}\r
-\r
-void AsyncEventServer::Run(HANDLE stopEvent)\r
-{\r
- WSANETWORKEVENTS networkEvents;\r
-\r
- HANDLE waitHandlesCopy[MAXIMUM_WAIT_OBJECTS];\r
- waitHandlesCopy[0] = stopEvent;\r
-\r
- while(true) {\r
- //Update local copy of the array of wait-handles if nessecery\r
- if(socketInfoCollection_.IsDirty()) {\r
- socketInfoCollection_.CopyCollectionToArray(&(waitHandlesCopy[1]), CASPAR_MAXIMUM_SOCKET_CLIENTS);\r
- socketInfoCollection_.ClearDirty();\r
- }\r
-\r
- DWORD waitResult = WSAWaitForMultipleEvents(std::min<DWORD>(static_cast<DWORD>(socketInfoCollection_.Size()+1), MAXIMUM_WAIT_OBJECTS), waitHandlesCopy, FALSE, 1500, FALSE);\r
- if(waitResult == WAIT_TIMEOUT)\r
- continue;\r
- else if(waitResult == WAIT_FAILED)\r
- break;\r
- else {\r
- DWORD eventIndex = waitResult - WAIT_OBJECT_0;\r
-\r
- HANDLE waitEvent = waitHandlesCopy[eventIndex];\r
- SocketInfoPtr pSocketInfo;\r
-\r
- if(eventIndex == 0) //stopEvent\r
- break;\r
- else if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {\r
- WSAEnumNetworkEvents(pSocketInfo->socket_, waitEvent, &networkEvents);\r
-\r
- if(networkEvents.lNetworkEvents & FD_ACCEPT) {\r
- if(networkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)\r
- OnAccept(pSocketInfo);\r
- else {\r
- CASPAR_LOG(debug) << "OnAccept (ErrorCode: " << networkEvents.iErrorCode[FD_ACCEPT_BIT] << TEXT(")");\r
- OnError(waitEvent, networkEvents.iErrorCode[FD_ACCEPT_BIT]);\r
- }\r
- }\r
-\r
- if(networkEvents.lNetworkEvents & FD_CLOSE) {\r
- if(networkEvents.iErrorCode[FD_CLOSE_BIT] == 0)\r
- OnClose(pSocketInfo);\r
- else {\r
- CASPAR_LOG(debug) << "OnClose (ErrorCode: " << networkEvents.iErrorCode[FD_CLOSE_BIT] << TEXT(")");\r
- OnError(waitEvent, networkEvents.iErrorCode[FD_CLOSE_BIT]);\r
- }\r
- continue;\r
- }\r
-\r
- if(networkEvents.lNetworkEvents & FD_READ) {\r
- if(networkEvents.iErrorCode[FD_READ_BIT] == 0)\r
- OnRead(pSocketInfo);\r
- else {\r
- CASPAR_LOG(debug) << "OnRead (ErrorCode: " << networkEvents.iErrorCode[FD_READ_BIT] << TEXT(")");\r
- OnError(waitEvent, networkEvents.iErrorCode[FD_READ_BIT]);\r
- }\r
- }\r
-\r
- if(networkEvents.lNetworkEvents & FD_WRITE) {\r
- if(networkEvents.iErrorCode[FD_WRITE_BIT] == 0)\r
- OnWrite(pSocketInfo);\r
- else {\r
- CASPAR_LOG(debug) << "OnWrite (ErrorCode: " << networkEvents.iErrorCode[FD_WRITE_BIT] << TEXT(")");\r
- OnError(waitEvent, networkEvents.iErrorCode[FD_WRITE_BIT]);\r
- }\r
- }\r
- }\r
- else {\r
- //Could not find the waitHandle in the SocketInfoCollection.\r
- //It must have been removed during the last call to WSAWaitForMultipleEvents\r
- }\r
- }\r
- }\r
-}\r
-\r
-bool AsyncEventServer::OnUnhandledException(const std::exception& ex) throw() {\r
- bool bDoRestart = true;\r
-\r
- try \r
- {\r
- CASPAR_LOG(fatal) << "UNHANDLED EXCEPTION in TCPServers listeningthread. Message: " << ex.what();\r
- }\r
- catch(...)\r
- {\r
- bDoRestart = false;\r
- }\r
-\r
- return bDoRestart;\r
-}\r
-\r
-///////////////////////////////\r
-// AsyncEventServer:Stop\r
-// COMMENT: Shuts down\r
-void AsyncEventServer::Stop()\r
-{\r
- //TODO: initiate shutdown on all clients connected\r
-// for(int i=0; i < _totalActiveSockets; ++i) {\r
-// shutdown(_pSocketInfo[i]->_socket, SD_SEND);\r
-// }\r
-\r
- if(!listenThread_.Stop()) {\r
- CASPAR_LOG(warning) << "Wait for listenThread timed out.";\r
- }\r
-\r
- socketInfoCollection_.Clear();\r
-}\r
-\r
-////////////////////////////////////////////////////////////////////\r
-//\r
-// MESSAGE HANDLERS \r
-//\r
-////////////////////////////////////////////////////////////////////\r
-\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::OnAccept\r
-// PARAMS: ...\r
-// COMMENT: Called when a new client connects\r
-bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) {\r
- sockaddr_in clientAddr;\r
- int addrSize = sizeof(clientAddr);\r
- SOCKET clientSocket = WSAAccept(pSI->socket_, (sockaddr*)&clientAddr, &addrSize, NULL, NULL);\r
- if(clientSocket == INVALID_SOCKET) {\r
- LogSocketError(TEXT("Accept"));\r
- return false;\r
- }\r
-\r
- SocketInfoPtr pClientSocket(new SocketInfo(clientSocket, this));\r
-\r
- //Determine if we can handle one more client\r
- if(socketInfoCollection_.Size() >= CASPAR_MAXIMUM_SOCKET_CLIENTS) {\r
- CASPAR_LOG(error) << "Could not accept ) << too many connections).";\r
- return true;\r
- }\r
-\r
- if(WSAEventSelect(pClientSocket->socket_, pClientSocket->event_, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) {\r
- LogSocketError(TEXT("Accept (failed create event for new client)"));\r
- return false;\r
- }\r
-\r
- TCHAR addressBuffer[32];\r
- MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32);\r
- pClientSocket->host_ = addressBuffer;\r
-\r
- socketInfoCollection_.AddSocketInfo(pClientSocket);\r
-\r
- CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str();\r
-\r
- return true;\r
-}\r
-\r
-bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)\r
-{\r
- if(codePage == CP_UTF8) {\r
- countLeftovers = 0;\r
- //check from the end of pSource for ev. uncompleted UTF-8 byte sequence\r
- if(pSource[sourceLength-1] & 0x80) {\r
- //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence\r
- int bytesToCheck = std::min(4, sourceLength); //a sequence contains a maximum of 4 bytes\r
- int currentLeftoverIndex = sourceLength-1;\r
- for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {\r
- ++countLeftovers;\r
- if(pSource[currentLeftoverIndex] & 0x80) {\r
- if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"\r
- int expectedSequenceLength = 2;\r
- if(pSource[currentLeftoverIndex] & 0x20)\r
- ++expectedSequenceLength;\r
- if(pSource[currentLeftoverIndex] & 0x10)\r
- ++expectedSequenceLength;\r
-\r
- if(countLeftovers < expectedSequenceLength) {\r
- //The sequence is incomplete. Leave the leftovers to be interpreted with the next call\r
- break;\r
- }\r
- //The sequence is complete, there are no leftovers. \r
- //...OR...\r
- //error. Let the conversion-function take the hit.\r
- countLeftovers = 0;\r
- break;\r
- }\r
- }\r
- else {\r
- //error. Let the conversion-function take the hit.\r
- countLeftovers = 0;\r
- break;\r
- }\r
- }\r
- if(countLeftovers == 4) {\r
- //error. Let the conversion-function take the hit.\r
- countLeftovers = 0;\r
- }\r
- }\r
- }\r
-\r
- int charsWritten = 0;\r
- int sourceBytesToProcess = sourceLength-countLeftovers;\r
- int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);\r
- if(wideBufferCapacity > 0) \r
- {\r
- wideBuffer.resize(wideBufferCapacity);\r
- charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, &wideBuffer[0], wideBuffer.size());\r
- }\r
- //copy the leftovers to the front of the buffer\r
- if(countLeftovers > 0) {\r
- memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);\r
- }\r
-\r
- wideBuffer.resize(charsWritten);\r
- return (charsWritten > 0);\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::OnRead\r
-// PARAMS: ...\r
-// COMMENT: Called then something arrives on the socket that has to be read\r
-bool AsyncEventServer::OnRead(SocketInfoPtr& pSI) {\r
- int recvResult = SOCKET_ERROR;\r
-\r
- int maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;\r
- recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);\r
- while(recvResult != SOCKET_ERROR) {\r
- if(recvResult == 0) {\r
- CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" disconnected");\r
-\r
- socketInfoCollection_.RemoveSocketInfo(pSI);\r
- return true;\r
- }\r
-\r
- //Convert to widechar\r
- if(ConvertMultiByteToWideChar(pProtocolStrategy_->GetCodepage(), pSI->recvBuffer_, recvResult + pSI->recvLeftoverOffset_, pSI->wideRecvBuffer_, pSI->recvLeftoverOffset_))\r
- pProtocolStrategy_->Parse(&pSI->wideRecvBuffer_[0], pSI->wideRecvBuffer_.size(), pSI);\r
- else \r
- CASPAR_LOG(error) << "Read from " << pSI->host_.c_str() << TEXT(" failed, could not convert command to UNICODE");\r
- \r
- \r
-\r
- maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;\r
- recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);\r
- }\r
-\r
- if(recvResult == SOCKET_ERROR) {\r
- int errorCode = WSAGetLastError();\r
- if(errorCode == WSAEWOULDBLOCK)\r
- return true;\r
- else {\r
- LogSocketError(TEXT("Read"), errorCode);\r
- OnError(pSI->event_, errorCode);\r
- }\r
- }\r
-\r
- return false;\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::OnWrite\r
-// PARAMS: ...\r
-// COMMENT: Called when the socket is ready to send more data\r
-void AsyncEventServer::OnWrite(SocketInfoPtr& pSI) {\r
- DoSend(*pSI); \r
-}\r
-\r
-bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)\r
-{\r
- int bytesWritten = 0;\r
- int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);\r
- if(multibyteBufferCapacity > 0) \r
- {\r
- destBuffer.resize(multibyteBufferCapacity);\r
- bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), &destBuffer[0], destBuffer.size(), NULL, NULL);\r
- }\r
- destBuffer.resize(bytesWritten);\r
- return (bytesWritten > 0);\r
-}\r
-\r
-void AsyncEventServer::DoSend(SocketInfo& socketInfo) {\r
- //Locks the socketInfo-object so that no one else tampers with the sendqueue at the same time\r
- tbb::mutex::scoped_lock lock(mutex_);\r
-\r
- while(!socketInfo.sendQueue_.empty() || socketInfo.currentlySending_.size() > 0) {\r
- if(socketInfo.currentlySending_.size() == 0) {\r
- //Read the next string in the queue and convert to UTF-8\r
- if(!ConvertWideCharToMultiByte(pProtocolStrategy_->GetCodepage(), socketInfo.sendQueue_.front(), socketInfo.currentlySending_))\r
- {\r
- CASPAR_LOG(error) << "Send to " << socketInfo.host_.c_str() << TEXT(" failed, could not convert response to UTF-8");\r
- }\r
- socketInfo.currentlySendingOffset_ = 0;\r
- }\r
-\r
- if(socketInfo.currentlySending_.size() > 0) {\r
- int bytesToSend = static_cast<int>(socketInfo.currentlySending_.size()-socketInfo.currentlySendingOffset_);\r
- int sentBytes = send(socketInfo.socket_, &socketInfo.currentlySending_[0] + socketInfo.currentlySendingOffset_, bytesToSend, 0);\r
- if(sentBytes == SOCKET_ERROR) {\r
- int errorCode = WSAGetLastError();\r
- if(errorCode == WSAEWOULDBLOCK) {\r
- CASPAR_LOG(debug) << "Send to " << socketInfo.host_.c_str() << TEXT(" would block, sending later");\r
- break;\r
- }\r
- else {\r
- LogSocketError(TEXT("Send"), errorCode);\r
- OnError(socketInfo.event_, errorCode);\r
-\r
- socketInfo.currentlySending_.resize(0);\r
- socketInfo.currentlySendingOffset_ = 0;\r
- socketInfo.sendQueue_.pop();\r
- break;\r
- }\r
- }\r
- else {\r
- if(sentBytes == bytesToSend) {\r
- if(sentBytes < 512)\r
- CASPAR_LOG(info) << "Sent: " << socketInfo.sendQueue_.front().c_str() << TEXT(" to ") << socketInfo.host_.c_str();\r
- else\r
- CASPAR_LOG(info) << "Sent more than 512 bytes to " << socketInfo.host_.c_str();\r
-\r
- socketInfo.currentlySending_.resize(0);\r
- socketInfo.currentlySendingOffset_ = 0;\r
- socketInfo.sendQueue_.pop();\r
- }\r
- else {\r
- socketInfo.currentlySendingOffset_ += sentBytes;\r
- CASPAR_LOG(info) << "Sent partial message to " << socketInfo.host_.c_str();\r
- }\r
- }\r
- }\r
- else\r
- socketInfo.sendQueue_.pop();\r
- }\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::OnClose\r
-// PARAMS: ...\r
-// COMMENT: Called when a client disconnects / is disconnected\r
-void AsyncEventServer::OnClose(SocketInfoPtr& pSI) {\r
- CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" was disconnected");\r
-\r
- socketInfoCollection_.RemoveSocketInfo(pSI);\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::OnError\r
-// PARAMS: ...\r
-// COMMENT: Called when an errorcode is recieved\r
-void AsyncEventServer::OnError(HANDLE waitEvent, int errorCode) {\r
- if(errorCode == WSAENETDOWN || errorCode == WSAECONNABORTED || errorCode == WSAECONNRESET || errorCode == WSAESHUTDOWN || errorCode == WSAETIMEDOUT || errorCode == WSAENOTCONN || errorCode == WSAENETRESET) {\r
- SocketInfoPtr pSocketInfo;\r
- if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {\r
- CASPAR_LOG(info) << "Client " << pSocketInfo->host_.c_str() << TEXT(" was disconnected, Errorcode ") << errorCode;\r
- }\r
-\r
- socketInfoCollection_.RemoveSocketInfo(waitEvent);\r
- }\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::DisconnectClient\r
-// PARAMS: ...\r
-// COMMENT: The client is removed from the actual client-list when an FD_CLOSE notification is recieved\r
-void AsyncEventServer::DisconnectClient(SocketInfo& socketInfo) {\r
- int result = shutdown(socketInfo.socket_, SD_SEND);\r
- if(result == SOCKET_ERROR)\r
- OnError(socketInfo.event_, result);\r
-}\r
-\r
-//////////////////////////////\r
-// AsyncEventServer::LogSocketError\r
-void AsyncEventServer::LogSocketError(const TCHAR* pStr, int socketError) {\r
- if(socketError == 0)\r
- socketError = WSAGetLastError();\r
-\r
- CASPAR_LOG(error) << "Failed to " << pStr << TEXT(" Errorcode: ") << socketError;\r
-}\r
-\r
-\r
-//////////////////////////////\r
-// SocketInfoCollection\r
-//////////////////////////////\r
-\r
-AsyncEventServer::SocketInfoCollection::SocketInfoCollection() : bDirty_(false) {\r
-}\r
-\r
-AsyncEventServer::SocketInfoCollection::~SocketInfoCollection() {\r
-}\r
-\r
-bool AsyncEventServer::SocketInfoCollection::AddSocketInfo(SocketInfoPtr& pSocketInfo) {\r
- tbb::mutex::scoped_lock lock(mutex_);\r
-\r
- waitEvents_.resize(waitEvents_.size()+1);\r
- bool bSuccess = socketInfoMap_.insert(SocketInfoMap::value_type(pSocketInfo->event_, pSocketInfo)).second;\r
- if(bSuccess) {\r
- waitEvents_[waitEvents_.size()-1] = pSocketInfo->event_;\r
- bDirty_ = true;\r
- }\r
-\r
- return bSuccess;\r
-}\r
-\r
-void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(SocketInfoPtr& pSocketInfo) {\r
- if(pSocketInfo != 0) {\r
- RemoveSocketInfo(pSocketInfo->event_);\r
- }\r
-}\r
-void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(HANDLE waitEvent) {\r
- tbb::mutex::scoped_lock lock(mutex_);\r
-\r
- //Find instance\r
- SocketInfoPtr pSocketInfo;\r
- SocketInfoMap::iterator it = socketInfoMap_.find(waitEvent);\r
- SocketInfoMap::iterator end = socketInfoMap_.end();\r
- if(it != end)\r
- pSocketInfo = it->second;\r
-\r
- if(pSocketInfo) {\r
- pSocketInfo->pServer_ = NULL;\r
-\r
- socketInfoMap_.erase(waitEvent);\r
-\r
- HandleVector::iterator it = std::find(waitEvents_.begin(), waitEvents_.end(), waitEvent);\r
- if(it != waitEvents_.end()) {\r
- std::swap((*it), waitEvents_.back());\r
- waitEvents_.resize(waitEvents_.size()-1);\r
-\r
- bDirty_ = true;\r
- }\r
- }\r
- if(onSocketInfoRemoved)\r
- onSocketInfoRemoved(pSocketInfo);\r
-}\r
-\r
-bool AsyncEventServer::SocketInfoCollection::FindSocketInfo(HANDLE key, SocketInfoPtr& pResult) {\r
- tbb::mutex::scoped_lock lock(mutex_);\r
-\r
- SocketInfoMap::iterator it = socketInfoMap_.find(key);\r
- SocketInfoMap::iterator end = socketInfoMap_.end();\r
- if(it != end)\r
- pResult = it->second;\r
-\r
- return (it != end);\r
-}\r
-\r
-void AsyncEventServer::SocketInfoCollection::CopyCollectionToArray(HANDLE* pDest, int maxCount) {\r
- tbb::mutex::scoped_lock lock(mutex_);\r
-\r
- memcpy(pDest, &(waitEvents_[0]), std::min( maxCount, static_cast<int>(waitEvents_.size()) ) * sizeof(HANDLE) );\r
-}\r
-\r
-void AsyncEventServer::SocketInfoCollection::Clear() {\r
- tbb::mutex::scoped_lock lock(mutex_);\r
-\r
- socketInfoMap_.clear();\r
- waitEvents_.clear();\r
-}\r
-\r
-} //namespace IO\r
-} //namespace caspar
\ No newline at end of file
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#include "../StdAfx.h"
+
+#include "AsyncEventServer.h"
+
+#include <algorithm>
+#include <array>
+#include <string>
+#include <set>
+#include <memory>
+#include <functional>
+
+#include <boost/asio.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include <tbb/mutex.h>
+#include <tbb/concurrent_hash_map.h>
+#include <tbb/concurrent_queue.h>
+
+using boost::asio::ip::tcp;
+
+namespace caspar { namespace IO {
+
+class connection;
+
+typedef std::set<spl::shared_ptr<connection>> connection_set;
+
+class connection : public spl::enable_shared_from_this<connection>
+{
+ typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
+ typedef tbb::concurrent_queue<std::string> send_queue;
+
+ const spl::shared_ptr<tcp::socket> socket_;
+ std::shared_ptr<boost::asio::io_service> service_;
+ const std::wstring listen_port_;
+ const spl::shared_ptr<connection_set> connection_set_;
+ protocol_strategy_factory<char>::ptr protocol_factory_;
+ std::shared_ptr<protocol_strategy<char>> protocol_;
+
+ std::array<char, 32768> data_;
+ lifecycle_map_type lifecycle_bound_objects_;
+ send_queue send_queue_;
+ bool is_writing_;
+
+ class connection_holder : public client_connection<char>
+ {
+ std::weak_ptr<connection> connection_;
+ public:
+ explicit connection_holder(std::weak_ptr<connection> conn) : connection_(std::move(conn))
+ {}
+
+ void send(std::basic_string<char>&& data) override
+ {
+ auto conn = connection_.lock();
+
+ if (conn)
+ conn->send(std::move(data));
+ }
+
+ void disconnect() override
+ {
+ auto conn = connection_.lock();
+
+ if (conn)
+ conn->disconnect();
+ }
+
+ std::wstring address() const override
+ {
+ auto conn = connection_.lock();
+
+ if (conn)
+ return conn->ipv4_address();
+ else
+ return L"[destroyed-connection]";
+ }
+
+ void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound) override
+ {
+ auto conn = connection_.lock();
+
+ if (conn)
+ return conn->add_lifecycle_bound_object(key, lifecycle_bound);
+ }
+
+ std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key) override
+ {
+ auto conn = connection_.lock();
+
+ if (conn)
+ return conn->remove_lifecycle_bound_object(key);
+ else
+ return std::shared_ptr<void>();
+ }
+ };
+
+public:
+ static spl::shared_ptr<connection> create(std::shared_ptr<boost::asio::io_service> service, spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
+ {
+ spl::shared_ptr<connection> con(new connection(std::move(service), std::move(socket), std::move(protocol), std::move(connection_set)));
+ con->init();
+ con->read_some();
+ return con;
+ }
+
+ void init()
+ {
+ protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
+ }
+
+ ~connection()
+ {
+ CASPAR_LOG(debug) << print() << L" connection destroyed.";
+ }
+
+ std::wstring print() const
+ {
+ return L"async_event_server[:" + listen_port_ + L"]";
+ }
+
+ std::wstring address() const
+ {
+ return u16(socket_->local_endpoint().address().to_string());
+ }
+
+ std::wstring ipv4_address() const
+ {
+ return socket_->is_open() ? u16(socket_->remote_endpoint().address().to_string()) : L"no-address";
+ }
+
+ void send(std::string&& data)
+ {
+ send_queue_.push(std::move(data));
+ auto self = shared_from_this();
+ service_->dispatch([=] { self->do_write(); });
+ }
+
+ void disconnect()
+ {
+ auto self = shared_from_this();
+ service_->dispatch([=] { self->stop(); });
+ }
+
+ void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
+ {
+ //thread-safe tbb_concurrent_hash_map
+ lifecycle_bound_objects_.insert(std::pair<std::wstring, std::shared_ptr<void>>(key, lifecycle_bound));
+ }
+ std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)
+ {
+ //thread-safe tbb_concurrent_hash_map
+ lifecycle_map_type::const_accessor acc;
+ if(lifecycle_bound_objects_.find(acc, key))
+ {
+ auto result = acc->second;
+ lifecycle_bound_objects_.erase(acc);
+ return result;
+ }
+ return std::shared_ptr<void>();
+ }
+
+private:
+ void do_write() //always called from the asio-service-thread
+ {
+ if(!is_writing_)
+ {
+ std::string data;
+ if(send_queue_.try_pop(data))
+ {
+ write_some(std::move(data));
+ }
+ }
+ }
+
+ void stop() //always called from the asio-service-thread
+ {
+ connection_set_->erase(shared_from_this());
+
+ CASPAR_LOG(info) << print() << L" Client " << ipv4_address() << L" disconnected (" << connection_set_->size() << L" connections).";
+
+ try
+ {
+ socket_->cancel();
+ socket_->close();
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ }
+
+ connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
+ : socket_(socket)
+ , service_(service)
+ , listen_port_(socket_->is_open() ? boost::lexical_cast<std::wstring>(socket_->local_endpoint().port()) : L"no-port")
+ , connection_set_(connection_set)
+ , protocol_factory_(protocol_factory)
+ , is_writing_(false)
+ {
+ CASPAR_LOG(info) << print() << L" Accepted connection from " << ipv4_address() << L" (" << (connection_set_->size() + 1) << L" connections).";
+ }
+
+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread
+ {
+ if(!error)
+ {
+ try
+ {
+ std::string data(data_.begin(), data_.begin() + bytes_transferred);
+
+ protocol_->parse(data);
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+
+ read_some();
+ }
+ else if (error != boost::asio::error::operation_aborted)
+ stop();
+ }
+
+ void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread
+ {
+ if(!error)
+ {
+ if(bytes_transferred != str->size())
+ {
+ str->assign(str->substr(bytes_transferred));
+ socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
+ }
+ else
+ {
+ is_writing_ = false;
+ do_write();
+ }
+ }
+ else if (error != boost::asio::error::operation_aborted && socket_->is_open())
+ stop();
+ }
+
+ void read_some() //always called from the asio-service-thread
+ {
+ socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
+ }
+
+ void write_some(std::string&& data) //always called from the asio-service-thread
+ {
+ is_writing_ = true;
+ auto str = spl::make_shared<std::string>(std::move(data));
+ socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
+ }
+
+ friend struct AsyncEventServer::implementation;
+};
+
+struct AsyncEventServer::implementation : public spl::enable_shared_from_this<implementation>
+{
+ std::shared_ptr<boost::asio::io_service> service_;
+ tcp::acceptor acceptor_;
+ protocol_strategy_factory<char>::ptr protocol_factory_;
+ spl::shared_ptr<connection_set> connection_set_;
+ std::vector<lifecycle_factory_t> lifecycle_factories_;
+ tbb::mutex mutex_;
+
+ implementation(std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
+ : service_(std::move(service))
+ , acceptor_(*service_, tcp::endpoint(tcp::v4(), port))
+ , protocol_factory_(protocol)
+ {
+ }
+
+ void stop()
+ {
+ try
+ {
+ acceptor_.cancel();
+ acceptor_.close();
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ }
+
+ ~implementation()
+ {
+ auto conns_set = connection_set_;
+
+ service_->post([conns_set]
+ {
+ auto connections = *conns_set;
+ for (auto& connection : connections)
+ connection->stop();
+ });
+ }
+
+ void start_accept()
+ {
+ spl::shared_ptr<tcp::socket> socket(new tcp::socket(*service_));
+ acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, shared_from_this(), socket, std::placeholders::_1));
+ }
+
+ void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
+ {
+ if (!acceptor_.is_open())
+ return;
+
+ if (!error)
+ {
+ boost::system::error_code ec;
+ socket->set_option(boost::asio::socket_base::keep_alive(true), ec);
+
+ if (ec)
+ CASPAR_LOG(warning) << print() << L" Failed to enable TCP keep-alive on socket";
+
+ auto conn = connection::create(service_, socket, protocol_factory_, connection_set_);
+ connection_set_->insert(conn);
+
+ for (auto& lifecycle_factory : lifecycle_factories_)
+ {
+ auto lifecycle_bound = lifecycle_factory(u8(conn->ipv4_address()));
+ conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second);
+ }
+ }
+ start_accept();
+ }
+
+ std::wstring print() const
+ {
+ return L"async_event_server[:" + boost::lexical_cast<std::wstring>(acceptor_.local_endpoint().port()) + L"]";
+ }
+
+ void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
+ {
+ auto self = shared_from_this();
+ service_->post([=]{ self->lifecycle_factories_.push_back(factory); });
+ }
+};
+
+AsyncEventServer::AsyncEventServer(
+ std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
+ : impl_(new implementation(std::move(service), protocol, port))
+{
+ impl_->start_accept();
+}
+
+AsyncEventServer::~AsyncEventServer()
+{
+ impl_->stop();
+}
+
+void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
+
+}}