2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG (www.casparcg.com).
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
19 * Author: Nicklas P Andersson
\r
23 // AsyncEventServer.cpp: implementation of the AsyncEventServer class.
\r
25 //////////////////////////////////////////////////////////////////////
\r
27 #include "../stdafx.h"
\r
29 #include "AsyncEventServer.h"
\r
30 #include "SocketInfo.h"
\r
32 #include <common/log/log.h>
\r
35 #include <algorithm>
\r
37 #if defined(_MSC_VER)
\r
38 #pragma warning (push, 1) // TODO: Legacy code, just disable warnings, will replace with boost::asio in future
\r
41 namespace caspar { namespace IO {
\r
43 #define CASPAR_MAXIMUM_SOCKET_CLIENTS (MAXIMUM_WAIT_OBJECTS-1)
\r
45 long AsyncEventServer::instanceCount_ = 0;
\r
46 //////////////////////////////
\r
47 // AsyncEventServer constructor
\r
48 // PARAMS: port(TCP-port the server should listen to)
\r
49 // COMMENT: Initializes the WinSock2 library
\r
50 AsyncEventServer::AsyncEventServer(const safe_ptr<IProtocolStrategy>& pProtocol, int port) : port_(port), pProtocolStrategy_(pProtocol)
\r
52 if(instanceCount_ == 0) {
\r
54 if(WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR)
\r
55 throw std::exception("Error initializing WinSock2");
\r
57 CASPAR_LOG(info) << "WinSock2 Initialized.";
\r
61 InterlockedIncrement(&instanceCount_);
\r
64 /////////////////////////////
\r
65 // AsyncEventServer destructor
\r
66 AsyncEventServer::~AsyncEventServer() {
\r
69 InterlockedDecrement(&instanceCount_);
\r
70 if(instanceCount_ == 0)
\r
74 void AsyncEventServer::SetClientDisconnectHandler(ClientDisconnectEvent handler) {
\r
75 socketInfoCollection_.onSocketInfoRemoved = handler;
\r
78 //////////////////////////////
\r
79 // AsyncEventServer::Start
\r
80 // RETURNS: true at successful startup
\r
81 bool AsyncEventServer::Start() {
\r
82 if(listenThread_.IsRunning())
\r
85 socketInfoCollection_.Clear();
\r
87 sockaddr_in sockAddr;
\r
88 ZeroMemory(&sockAddr, sizeof(sockAddr));
\r
89 sockAddr.sin_family = AF_INET;
\r
90 sockAddr.sin_addr.s_addr = INADDR_ANY;
\r
91 sockAddr.sin_port = htons(port_);
\r
93 SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
\r
94 if(listenSocket == INVALID_SOCKET) {
\r
95 CASPAR_LOG(error) << "Failed to create listenSocket";
\r
99 pListenSocketInfo_ = SocketInfoPtr(new SocketInfo(listenSocket, this));
\r
101 if(WSAEventSelect(pListenSocketInfo_->socket_, pListenSocketInfo_->event_, FD_ACCEPT|FD_CLOSE) == SOCKET_ERROR) {
\r
102 CASPAR_LOG(error) << "Failed to enter EventSelect-mode for listenSocket";
\r
106 if(bind(pListenSocketInfo_->socket_, (sockaddr*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) {
\r
107 CASPAR_LOG(error) << "Failed to bind listenSocket";
\r
111 if(listen(pListenSocketInfo_->socket_, SOMAXCONN) == SOCKET_ERROR) {
\r
112 CASPAR_LOG(error) << "Failed to listen";
\r
116 socketInfoCollection_.AddSocketInfo(pListenSocketInfo_);
\r
118 //start thread: the entrypoint is Run(EVENT stopEvent)
\r
119 if(!listenThread_.Start(this)) {
\r
120 CASPAR_LOG(error) << "Failed to create ListenThread";
\r
124 CASPAR_LOG(info) << "Listener successfully initialized";
\r
128 void AsyncEventServer::Run(HANDLE stopEvent)
\r
130 WSANETWORKEVENTS networkEvents;
\r
132 HANDLE waitHandlesCopy[MAXIMUM_WAIT_OBJECTS];
\r
133 waitHandlesCopy[0] = stopEvent;
\r
136 //Update local copy of the array of wait-handles if nessecery
\r
137 if(socketInfoCollection_.IsDirty()) {
\r
138 socketInfoCollection_.CopyCollectionToArray(&(waitHandlesCopy[1]), CASPAR_MAXIMUM_SOCKET_CLIENTS);
\r
139 socketInfoCollection_.ClearDirty();
\r
142 DWORD waitResult = WSAWaitForMultipleEvents(std::min<DWORD>(static_cast<DWORD>(socketInfoCollection_.Size()+1), MAXIMUM_WAIT_OBJECTS), waitHandlesCopy, FALSE, 1500, FALSE);
\r
143 if(waitResult == WAIT_TIMEOUT)
\r
145 else if(waitResult == WAIT_FAILED)
\r
148 DWORD eventIndex = waitResult - WAIT_OBJECT_0;
\r
150 HANDLE waitEvent = waitHandlesCopy[eventIndex];
\r
151 SocketInfoPtr pSocketInfo;
\r
153 if(eventIndex == 0) //stopEvent
\r
155 else if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {
\r
156 WSAEnumNetworkEvents(pSocketInfo->socket_, waitEvent, &networkEvents);
\r
158 if(networkEvents.lNetworkEvents & FD_ACCEPT) {
\r
159 if(networkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
\r
160 OnAccept(pSocketInfo);
\r
162 CASPAR_LOG(debug) << "OnAccept (ErrorCode: " << networkEvents.iErrorCode[FD_ACCEPT_BIT] << TEXT(")");
\r
163 OnError(waitEvent, networkEvents.iErrorCode[FD_ACCEPT_BIT]);
\r
167 if(networkEvents.lNetworkEvents & FD_CLOSE) {
\r
168 if(networkEvents.iErrorCode[FD_CLOSE_BIT] == 0)
\r
169 OnClose(pSocketInfo);
\r
171 CASPAR_LOG(debug) << "OnClose (ErrorCode: " << networkEvents.iErrorCode[FD_CLOSE_BIT] << TEXT(")");
\r
172 OnError(waitEvent, networkEvents.iErrorCode[FD_CLOSE_BIT]);
\r
177 if(networkEvents.lNetworkEvents & FD_READ) {
\r
178 if(networkEvents.iErrorCode[FD_READ_BIT] == 0)
\r
179 OnRead(pSocketInfo);
\r
181 CASPAR_LOG(debug) << "OnRead (ErrorCode: " << networkEvents.iErrorCode[FD_READ_BIT] << TEXT(")");
\r
182 OnError(waitEvent, networkEvents.iErrorCode[FD_READ_BIT]);
\r
186 if(networkEvents.lNetworkEvents & FD_WRITE) {
\r
187 if(networkEvents.iErrorCode[FD_WRITE_BIT] == 0)
\r
188 OnWrite(pSocketInfo);
\r
190 CASPAR_LOG(debug) << "OnWrite (ErrorCode: " << networkEvents.iErrorCode[FD_WRITE_BIT] << TEXT(")");
\r
191 OnError(waitEvent, networkEvents.iErrorCode[FD_WRITE_BIT]);
\r
196 //Could not find the waitHandle in the SocketInfoCollection.
\r
197 //It must have been removed during the last call to WSAWaitForMultipleEvents
\r
203 bool AsyncEventServer::OnUnhandledException(const std::exception& ex) throw() {
\r
204 bool bDoRestart = true;
\r
208 CASPAR_LOG(fatal) << "UNHANDLED EXCEPTION in TCPServers listeningthread. Message: " << ex.what();
\r
212 bDoRestart = false;
\r
218 ///////////////////////////////
\r
219 // AsyncEventServer:Stop
\r
220 // COMMENT: Shuts down
\r
221 void AsyncEventServer::Stop()
\r
223 //TODO: initiate shutdown on all clients connected
\r
224 // for(int i=0; i < _totalActiveSockets; ++i) {
\r
225 // shutdown(_pSocketInfo[i]->_socket, SD_SEND);
\r
228 if(!listenThread_.Stop()) {
\r
229 CASPAR_LOG(warning) << "Wait for listenThread timed out.";
\r
232 socketInfoCollection_.Clear();
\r
235 ////////////////////////////////////////////////////////////////////
\r
237 // MESSAGE HANDLERS
\r
239 ////////////////////////////////////////////////////////////////////
\r
242 //////////////////////////////
\r
243 // AsyncEventServer::OnAccept
\r
245 // COMMENT: Called when a new client connects
\r
246 bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) {
\r
247 sockaddr_in clientAddr;
\r
248 int addrSize = sizeof(clientAddr);
\r
249 SOCKET clientSocket = WSAAccept(pSI->socket_, (sockaddr*)&clientAddr, &addrSize, NULL, NULL);
\r
250 if(clientSocket == INVALID_SOCKET) {
\r
251 LogSocketError(TEXT("Accept"));
\r
255 SocketInfoPtr pClientSocket(new SocketInfo(clientSocket, this));
\r
257 //Determine if we can handle one more client
\r
258 if(socketInfoCollection_.Size() >= CASPAR_MAXIMUM_SOCKET_CLIENTS) {
\r
259 CASPAR_LOG(error) << "Could not accept ) << too many connections).";
\r
263 if(WSAEventSelect(pClientSocket->socket_, pClientSocket->event_, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) {
\r
264 LogSocketError(TEXT("Accept (failed create event for new client)"));
\r
268 TCHAR addressBuffer[32];
\r
269 MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32);
\r
270 pClientSocket->host_ = addressBuffer;
\r
272 socketInfoCollection_.AddSocketInfo(pClientSocket);
\r
274 CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str();
\r
279 bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)
\r
281 if(codePage == CP_UTF8) {
\r
282 countLeftovers = 0;
\r
283 //check from the end of pSource for ev. uncompleted UTF-8 byte sequence
\r
284 if(pSource[sourceLength-1] & 0x80) {
\r
285 //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence
\r
286 int bytesToCheck = std::min(4, sourceLength); //a sequence contains a maximum of 4 bytes
\r
287 int currentLeftoverIndex = sourceLength-1;
\r
288 for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {
\r
290 if(pSource[currentLeftoverIndex] & 0x80) {
\r
291 if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"
\r
292 int expectedSequenceLength = 2;
\r
293 if(pSource[currentLeftoverIndex] & 0x20)
\r
294 ++expectedSequenceLength;
\r
295 if(pSource[currentLeftoverIndex] & 0x10)
\r
296 ++expectedSequenceLength;
\r
298 if(countLeftovers < expectedSequenceLength) {
\r
299 //The sequence is incomplete. Leave the leftovers to be interpreted with the next call
\r
302 //The sequence is complete, there are no leftovers.
\r
304 //error. Let the conversion-function take the hit.
\r
305 countLeftovers = 0;
\r
310 //error. Let the conversion-function take the hit.
\r
311 countLeftovers = 0;
\r
315 if(countLeftovers == 4) {
\r
316 //error. Let the conversion-function take the hit.
\r
317 countLeftovers = 0;
\r
322 int charsWritten = 0;
\r
323 int sourceBytesToProcess = sourceLength-countLeftovers;
\r
324 int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);
\r
325 if(wideBufferCapacity > 0)
\r
327 wideBuffer.resize(wideBufferCapacity);
\r
328 charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, &wideBuffer[0], wideBuffer.size());
\r
330 //copy the leftovers to the front of the buffer
\r
331 if(countLeftovers > 0) {
\r
332 memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);
\r
335 wideBuffer.resize(charsWritten);
\r
336 return (charsWritten > 0);
\r
339 //////////////////////////////
\r
340 // AsyncEventServer::OnRead
\r
342 // COMMENT: Called then something arrives on the socket that has to be read
\r
343 bool AsyncEventServer::OnRead(SocketInfoPtr& pSI) {
\r
344 int recvResult = SOCKET_ERROR;
\r
346 int maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;
\r
347 recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);
\r
348 while(recvResult != SOCKET_ERROR) {
\r
349 if(recvResult == 0) {
\r
350 CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" disconnected");
\r
352 socketInfoCollection_.RemoveSocketInfo(pSI);
\r
356 //Convert to widechar
\r
357 if(ConvertMultiByteToWideChar(pProtocolStrategy_->GetCodepage(), pSI->recvBuffer_, recvResult + pSI->recvLeftoverOffset_, pSI->wideRecvBuffer_, pSI->recvLeftoverOffset_))
\r
358 pProtocolStrategy_->Parse(&pSI->wideRecvBuffer_[0], pSI->wideRecvBuffer_.size(), pSI);
\r
360 CASPAR_LOG(error) << "Read from " << pSI->host_.c_str() << TEXT(" failed, could not convert command to UNICODE");
\r
364 maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;
\r
365 recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);
\r
368 if(recvResult == SOCKET_ERROR) {
\r
369 int errorCode = WSAGetLastError();
\r
370 if(errorCode == WSAEWOULDBLOCK)
\r
373 LogSocketError(TEXT("Read"), errorCode);
\r
374 OnError(pSI->event_, errorCode);
\r
381 //////////////////////////////
\r
382 // AsyncEventServer::OnWrite
\r
384 // COMMENT: Called when the socket is ready to send more data
\r
385 void AsyncEventServer::OnWrite(SocketInfoPtr& pSI) {
\r
389 bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)
\r
391 int bytesWritten = 0;
\r
392 int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);
\r
393 if(multibyteBufferCapacity > 0)
\r
395 destBuffer.resize(multibyteBufferCapacity);
\r
396 bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), &destBuffer[0], destBuffer.size(), NULL, NULL);
\r
398 destBuffer.resize(bytesWritten);
\r
399 return (bytesWritten > 0);
\r
402 void AsyncEventServer::DoSend(SocketInfo& socketInfo) {
\r
403 //Locks the socketInfo-object so that no one else tampers with the sendqueue at the same time
\r
404 tbb::mutex::scoped_lock lock(mutex_);
\r
406 while(!socketInfo.sendQueue_.empty() || socketInfo.currentlySending_.size() > 0) {
\r
407 if(socketInfo.currentlySending_.size() == 0) {
\r
408 //Read the next string in the queue and convert to UTF-8
\r
409 if(!ConvertWideCharToMultiByte(pProtocolStrategy_->GetCodepage(), socketInfo.sendQueue_.front(), socketInfo.currentlySending_))
\r
411 CASPAR_LOG(error) << "Send to " << socketInfo.host_.c_str() << TEXT(" failed, could not convert response to UTF-8");
\r
413 socketInfo.currentlySendingOffset_ = 0;
\r
416 if(socketInfo.currentlySending_.size() > 0) {
\r
417 int bytesToSend = static_cast<int>(socketInfo.currentlySending_.size()-socketInfo.currentlySendingOffset_);
\r
418 int sentBytes = send(socketInfo.socket_, &socketInfo.currentlySending_[0] + socketInfo.currentlySendingOffset_, bytesToSend, 0);
\r
419 if(sentBytes == SOCKET_ERROR) {
\r
420 int errorCode = WSAGetLastError();
\r
421 if(errorCode == WSAEWOULDBLOCK) {
\r
422 CASPAR_LOG(debug) << "Send to " << socketInfo.host_.c_str() << TEXT(" would block, sending later");
\r
426 LogSocketError(TEXT("Send"), errorCode);
\r
427 OnError(socketInfo.event_, errorCode);
\r
429 socketInfo.currentlySending_.resize(0);
\r
430 socketInfo.currentlySendingOffset_ = 0;
\r
431 socketInfo.sendQueue_.pop();
\r
436 if(sentBytes == bytesToSend) {
\r
437 if(sentBytes < 200)
\r
438 CASPAR_LOG(info) << "Sent " << socketInfo.sendQueue_.front().c_str() << TEXT(" to ") << socketInfo.host_.c_str();
\r
440 CASPAR_LOG(info) << "Sent more than 200 bytes to " << socketInfo.host_.c_str();
\r
442 socketInfo.currentlySending_.resize(0);
\r
443 socketInfo.currentlySendingOffset_ = 0;
\r
444 socketInfo.sendQueue_.pop();
\r
447 socketInfo.currentlySendingOffset_ += sentBytes;
\r
448 CASPAR_LOG(info) << "Sent partial message to " << socketInfo.host_.c_str();
\r
453 socketInfo.sendQueue_.pop();
\r
457 //////////////////////////////
\r
458 // AsyncEventServer::OnClose
\r
460 // COMMENT: Called when a client disconnects / is disconnected
\r
461 void AsyncEventServer::OnClose(SocketInfoPtr& pSI) {
\r
462 CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" was disconnected");
\r
464 socketInfoCollection_.RemoveSocketInfo(pSI);
\r
467 //////////////////////////////
\r
468 // AsyncEventServer::OnError
\r
470 // COMMENT: Called when an errorcode is recieved
\r
471 void AsyncEventServer::OnError(HANDLE waitEvent, int errorCode) {
\r
472 if(errorCode == WSAENETDOWN || errorCode == WSAECONNABORTED || errorCode == WSAECONNRESET || errorCode == WSAESHUTDOWN || errorCode == WSAETIMEDOUT || errorCode == WSAENOTCONN || errorCode == WSAENETRESET) {
\r
473 SocketInfoPtr pSocketInfo;
\r
474 if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {
\r
475 CASPAR_LOG(info) << "Client " << pSocketInfo->host_.c_str() << TEXT(" was disconnected, Errorcode ") << errorCode;
\r
478 socketInfoCollection_.RemoveSocketInfo(waitEvent);
\r
482 //////////////////////////////
\r
483 // AsyncEventServer::DisconnectClient
\r
485 // COMMENT: The client is removed from the actual client-list when an FD_CLOSE notification is recieved
\r
486 void AsyncEventServer::DisconnectClient(SocketInfo& socketInfo) {
\r
487 int result = shutdown(socketInfo.socket_, SD_SEND);
\r
488 if(result == SOCKET_ERROR)
\r
489 OnError(socketInfo.event_, result);
\r
492 //////////////////////////////
\r
493 // AsyncEventServer::LogSocketError
\r
494 void AsyncEventServer::LogSocketError(const TCHAR* pStr, int socketError) {
\r
495 if(socketError == 0)
\r
496 socketError = WSAGetLastError();
\r
498 CASPAR_LOG(error) << "Failed to " << pStr << TEXT(" Errorcode: ") << socketError;
\r
502 //////////////////////////////
\r
503 // SocketInfoCollection
\r
504 //////////////////////////////
\r
506 AsyncEventServer::SocketInfoCollection::SocketInfoCollection() : bDirty_(false) {
\r
509 AsyncEventServer::SocketInfoCollection::~SocketInfoCollection() {
\r
512 bool AsyncEventServer::SocketInfoCollection::AddSocketInfo(SocketInfoPtr& pSocketInfo) {
\r
513 tbb::mutex::scoped_lock lock(mutex_);
\r
515 waitEvents_.resize(waitEvents_.size()+1);
\r
516 bool bSuccess = socketInfoMap_.insert(SocketInfoMap::value_type(pSocketInfo->event_, pSocketInfo)).second;
\r
518 waitEvents_[waitEvents_.size()-1] = pSocketInfo->event_;
\r
525 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(SocketInfoPtr& pSocketInfo) {
\r
526 if(pSocketInfo != 0) {
\r
527 RemoveSocketInfo(pSocketInfo->event_);
\r
530 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(HANDLE waitEvent) {
\r
531 tbb::mutex::scoped_lock lock(mutex_);
\r
534 SocketInfoPtr pSocketInfo;
\r
535 SocketInfoMap::iterator it = socketInfoMap_.find(waitEvent);
\r
536 SocketInfoMap::iterator end = socketInfoMap_.end();
\r
538 pSocketInfo = it->second;
\r
541 pSocketInfo->pServer_ = NULL;
\r
543 socketInfoMap_.erase(waitEvent);
\r
545 HandleVector::iterator it = std::find(waitEvents_.begin(), waitEvents_.end(), waitEvent);
\r
546 if(it != waitEvents_.end()) {
\r
547 std::swap((*it), waitEvents_.back());
\r
548 waitEvents_.resize(waitEvents_.size()-1);
\r
553 if(onSocketInfoRemoved)
\r
554 onSocketInfoRemoved(pSocketInfo);
\r
557 bool AsyncEventServer::SocketInfoCollection::FindSocketInfo(HANDLE key, SocketInfoPtr& pResult) {
\r
558 tbb::mutex::scoped_lock lock(mutex_);
\r
560 SocketInfoMap::iterator it = socketInfoMap_.find(key);
\r
561 SocketInfoMap::iterator end = socketInfoMap_.end();
\r
563 pResult = it->second;
\r
565 return (it != end);
\r
568 void AsyncEventServer::SocketInfoCollection::CopyCollectionToArray(HANDLE* pDest, int maxCount) {
\r
569 tbb::mutex::scoped_lock lock(mutex_);
\r
571 memcpy(pDest, &(waitEvents_[0]), std::min( maxCount, static_cast<int>(waitEvents_.size()) ) * sizeof(HANDLE) );
\r
574 void AsyncEventServer::SocketInfoCollection::Clear() {
\r
575 tbb::mutex::scoped_lock lock(mutex_);
\r
577 socketInfoMap_.clear();
\r
578 waitEvents_.clear();
\r