2 * copyright (c) 2010 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG.
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
21 // AsyncEventServer.cpp: implementation of the AsyncEventServer class.
\r
23 //////////////////////////////////////////////////////////////////////
\r
25 #include "../stdafx.h"
\r
27 #include "AsyncEventServer.h"
\r
28 #include "SocketInfo.h"
\r
30 #include <common/log/log.h>
\r
33 #include <algorithm>
\r
35 #if defined(_MSC_VER)
\r
36 #pragma warning (push, 1) // TODO: Legacy code, just disable warnings, will replace with boost::asio in future
\r
39 namespace caspar { namespace IO {
\r
41 #define CASPAR_MAXIMUM_SOCKET_CLIENTS (MAXIMUM_WAIT_OBJECTS-1)
\r
43 long AsyncEventServer::instanceCount_ = 0;
\r
44 //////////////////////////////
\r
45 // AsyncEventServer constructor
\r
46 // PARAMS: port(TCP-port the server should listen to)
\r
47 // COMMENT: Initializes the WinSock2 library
\r
48 AsyncEventServer::AsyncEventServer(const safe_ptr<IProtocolStrategy>& pProtocol, int port) : port_(port), pProtocolStrategy_(pProtocol)
\r
50 if(instanceCount_ == 0) {
\r
52 if(WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR)
\r
53 throw std::exception("Error initializing WinSock2");
\r
55 CASPAR_LOG(info) << "WinSock2 Initialized.";
\r
59 InterlockedIncrement(&instanceCount_);
\r
62 /////////////////////////////
\r
63 // AsyncEventServer destructor
\r
64 AsyncEventServer::~AsyncEventServer() {
\r
67 InterlockedDecrement(&instanceCount_);
\r
68 if(instanceCount_ == 0)
\r
72 void AsyncEventServer::SetClientDisconnectHandler(ClientDisconnectEvent handler) {
\r
73 socketInfoCollection_.onSocketInfoRemoved = handler;
\r
76 //////////////////////////////
\r
77 // AsyncEventServer::Start
\r
78 // RETURNS: true at successful startup
\r
79 bool AsyncEventServer::Start() {
\r
80 if(listenThread_.IsRunning())
\r
83 socketInfoCollection_.Clear();
\r
85 sockaddr_in sockAddr;
\r
86 ZeroMemory(&sockAddr, sizeof(sockAddr));
\r
87 sockAddr.sin_family = AF_INET;
\r
88 sockAddr.sin_addr.s_addr = INADDR_ANY;
\r
89 sockAddr.sin_port = htons(port_);
\r
91 SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
\r
92 if(listenSocket == INVALID_SOCKET) {
\r
93 CASPAR_LOG(error) << "Failed to create listenSocket";
\r
97 pListenSocketInfo_ = SocketInfoPtr(new SocketInfo(listenSocket, this));
\r
99 if(WSAEventSelect(pListenSocketInfo_->socket_, pListenSocketInfo_->event_, FD_ACCEPT|FD_CLOSE) == SOCKET_ERROR) {
\r
100 CASPAR_LOG(error) << "Failed to enter EventSelect-mode for listenSocket";
\r
104 if(bind(pListenSocketInfo_->socket_, (sockaddr*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) {
\r
105 CASPAR_LOG(error) << "Failed to bind listenSocket";
\r
109 if(listen(pListenSocketInfo_->socket_, SOMAXCONN) == SOCKET_ERROR) {
\r
110 CASPAR_LOG(error) << "Failed to listen";
\r
114 socketInfoCollection_.AddSocketInfo(pListenSocketInfo_);
\r
116 //start thread: the entrypoint is Run(EVENT stopEvent)
\r
117 if(!listenThread_.Start(this)) {
\r
118 CASPAR_LOG(error) << "Failed to create ListenThread";
\r
122 CASPAR_LOG(info) << "Listener successfully initialized";
\r
126 void AsyncEventServer::Run(HANDLE stopEvent)
\r
128 WSANETWORKEVENTS networkEvents;
\r
130 HANDLE waitHandlesCopy[MAXIMUM_WAIT_OBJECTS];
\r
131 waitHandlesCopy[0] = stopEvent;
\r
134 //Update local copy of the array of wait-handles if nessecery
\r
135 if(socketInfoCollection_.IsDirty()) {
\r
136 socketInfoCollection_.CopyCollectionToArray(&(waitHandlesCopy[1]), CASPAR_MAXIMUM_SOCKET_CLIENTS);
\r
137 socketInfoCollection_.ClearDirty();
\r
140 DWORD waitResult = WSAWaitForMultipleEvents(std::min<DWORD>(static_cast<DWORD>(socketInfoCollection_.Size()+1), MAXIMUM_WAIT_OBJECTS), waitHandlesCopy, FALSE, 1500, FALSE);
\r
141 if(waitResult == WAIT_TIMEOUT)
\r
143 else if(waitResult == WAIT_FAILED)
\r
146 DWORD eventIndex = waitResult - WAIT_OBJECT_0;
\r
148 HANDLE waitEvent = waitHandlesCopy[eventIndex];
\r
149 SocketInfoPtr pSocketInfo;
\r
151 if(eventIndex == 0) //stopEvent
\r
153 else if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {
\r
154 WSAEnumNetworkEvents(pSocketInfo->socket_, waitEvent, &networkEvents);
\r
156 if(networkEvents.lNetworkEvents & FD_ACCEPT) {
\r
157 if(networkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
\r
158 OnAccept(pSocketInfo);
\r
160 CASPAR_LOG(debug) << "OnAccept (ErrorCode: " << networkEvents.iErrorCode[FD_ACCEPT_BIT] << TEXT(")");
\r
161 OnError(waitEvent, networkEvents.iErrorCode[FD_ACCEPT_BIT]);
\r
165 if(networkEvents.lNetworkEvents & FD_CLOSE) {
\r
166 if(networkEvents.iErrorCode[FD_CLOSE_BIT] == 0)
\r
167 OnClose(pSocketInfo);
\r
169 CASPAR_LOG(debug) << "OnClose (ErrorCode: " << networkEvents.iErrorCode[FD_CLOSE_BIT] << TEXT(")");
\r
170 OnError(waitEvent, networkEvents.iErrorCode[FD_CLOSE_BIT]);
\r
175 if(networkEvents.lNetworkEvents & FD_READ) {
\r
176 if(networkEvents.iErrorCode[FD_READ_BIT] == 0)
\r
177 OnRead(pSocketInfo);
\r
179 CASPAR_LOG(debug) << "OnRead (ErrorCode: " << networkEvents.iErrorCode[FD_READ_BIT] << TEXT(")");
\r
180 OnError(waitEvent, networkEvents.iErrorCode[FD_READ_BIT]);
\r
184 if(networkEvents.lNetworkEvents & FD_WRITE) {
\r
185 if(networkEvents.iErrorCode[FD_WRITE_BIT] == 0)
\r
186 OnWrite(pSocketInfo);
\r
188 CASPAR_LOG(debug) << "OnWrite (ErrorCode: " << networkEvents.iErrorCode[FD_WRITE_BIT] << TEXT(")");
\r
189 OnError(waitEvent, networkEvents.iErrorCode[FD_WRITE_BIT]);
\r
194 //Could not find the waitHandle in the SocketInfoCollection.
\r
195 //It must have been removed during the last call to WSAWaitForMultipleEvents
\r
201 bool AsyncEventServer::OnUnhandledException(const std::exception& ex) throw() {
\r
202 bool bDoRestart = true;
\r
206 CASPAR_LOG(fatal) << "UNHANDLED EXCEPTION in TCPServers listeningthread. Message: " << ex.what();
\r
210 bDoRestart = false;
\r
216 ///////////////////////////////
\r
217 // AsyncEventServer:Stop
\r
218 // COMMENT: Shuts down
\r
219 void AsyncEventServer::Stop()
\r
221 //TODO: initiate shutdown on all clients connected
\r
222 // for(int i=0; i < _totalActiveSockets; ++i) {
\r
223 // shutdown(_pSocketInfo[i]->_socket, SD_SEND);
\r
226 if(!listenThread_.Stop()) {
\r
227 CASPAR_LOG(warning) << "Wait for listenThread timed out.";
\r
230 socketInfoCollection_.Clear();
\r
233 ////////////////////////////////////////////////////////////////////
\r
235 // MESSAGE HANDLERS
\r
237 ////////////////////////////////////////////////////////////////////
\r
240 //////////////////////////////
\r
241 // AsyncEventServer::OnAccept
\r
243 // COMMENT: Called when a new client connects
\r
244 bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) {
\r
245 sockaddr_in clientAddr;
\r
246 int addrSize = sizeof(clientAddr);
\r
247 SOCKET clientSocket = WSAAccept(pSI->socket_, (sockaddr*)&clientAddr, &addrSize, NULL, NULL);
\r
248 if(clientSocket == INVALID_SOCKET) {
\r
249 LogSocketError(TEXT("Accept"));
\r
253 SocketInfoPtr pClientSocket(new SocketInfo(clientSocket, this));
\r
255 //Determine if we can handle one more client
\r
256 if(socketInfoCollection_.Size() >= CASPAR_MAXIMUM_SOCKET_CLIENTS) {
\r
257 CASPAR_LOG(error) << "Could not accept ) << too many connections).";
\r
261 if(WSAEventSelect(pClientSocket->socket_, pClientSocket->event_, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) {
\r
262 LogSocketError(TEXT("Accept (failed create event for new client)"));
\r
266 TCHAR addressBuffer[32];
\r
267 MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32);
\r
268 pClientSocket->host_ = addressBuffer;
\r
270 socketInfoCollection_.AddSocketInfo(pClientSocket);
\r
272 CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str();
\r
277 bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)
\r
279 if(codePage == CP_UTF8) {
\r
280 countLeftovers = 0;
\r
281 //check from the end of pSource for ev. uncompleted UTF-8 byte sequence
\r
282 if(pSource[sourceLength-1] & 0x80) {
\r
283 //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence
\r
284 int bytesToCheck = std::min(4, sourceLength); //a sequence contains a maximum of 4 bytes
\r
285 int currentLeftoverIndex = sourceLength-1;
\r
286 for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {
\r
288 if(pSource[currentLeftoverIndex] & 0x80) {
\r
289 if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"
\r
290 int expectedSequenceLength = 2;
\r
291 if(pSource[currentLeftoverIndex] & 0x20)
\r
292 ++expectedSequenceLength;
\r
293 if(pSource[currentLeftoverIndex] & 0x10)
\r
294 ++expectedSequenceLength;
\r
296 if(countLeftovers < expectedSequenceLength) {
\r
297 //The sequence is incomplete. Leave the leftovers to be interpreted with the next call
\r
300 //The sequence is complete, there are no leftovers.
\r
302 //error. Let the conversion-function take the hit.
\r
303 countLeftovers = 0;
\r
308 //error. Let the conversion-function take the hit.
\r
309 countLeftovers = 0;
\r
313 if(countLeftovers == 4) {
\r
314 //error. Let the conversion-function take the hit.
\r
315 countLeftovers = 0;
\r
320 int charsWritten = 0;
\r
321 int sourceBytesToProcess = sourceLength-countLeftovers;
\r
322 int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);
\r
323 if(wideBufferCapacity > 0)
\r
325 wideBuffer.resize(wideBufferCapacity);
\r
326 charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, &wideBuffer[0], wideBuffer.size());
\r
328 //copy the leftovers to the front of the buffer
\r
329 if(countLeftovers > 0) {
\r
330 memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);
\r
333 wideBuffer.resize(charsWritten);
\r
334 return (charsWritten > 0);
\r
337 //////////////////////////////
\r
338 // AsyncEventServer::OnRead
\r
340 // COMMENT: Called then something arrives on the socket that has to be read
\r
341 bool AsyncEventServer::OnRead(SocketInfoPtr& pSI) {
\r
342 int recvResult = SOCKET_ERROR;
\r
344 int maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;
\r
345 recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);
\r
346 while(recvResult != SOCKET_ERROR) {
\r
347 if(recvResult == 0) {
\r
348 CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" disconnected");
\r
350 socketInfoCollection_.RemoveSocketInfo(pSI);
\r
354 //Convert to widechar
\r
355 if(ConvertMultiByteToWideChar(pProtocolStrategy_->GetCodepage(), pSI->recvBuffer_, recvResult + pSI->recvLeftoverOffset_, pSI->wideRecvBuffer_, pSI->recvLeftoverOffset_))
\r
356 pProtocolStrategy_->Parse(&pSI->wideRecvBuffer_[0], pSI->wideRecvBuffer_.size(), pSI);
\r
358 CASPAR_LOG(error) << "Read from " << pSI->host_.c_str() << TEXT(" failed, could not convert command to UNICODE");
\r
362 maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;
\r
363 recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);
\r
366 if(recvResult == SOCKET_ERROR) {
\r
367 int errorCode = WSAGetLastError();
\r
368 if(errorCode == WSAEWOULDBLOCK)
\r
371 LogSocketError(TEXT("Read"), errorCode);
\r
372 OnError(pSI->event_, errorCode);
\r
379 //////////////////////////////
\r
380 // AsyncEventServer::OnWrite
\r
382 // COMMENT: Called when the socket is ready to send more data
\r
383 void AsyncEventServer::OnWrite(SocketInfoPtr& pSI) {
\r
387 bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)
\r
389 int bytesWritten = 0;
\r
390 int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);
\r
391 if(multibyteBufferCapacity > 0)
\r
393 destBuffer.resize(multibyteBufferCapacity);
\r
394 bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), &destBuffer[0], destBuffer.size(), NULL, NULL);
\r
396 destBuffer.resize(bytesWritten);
\r
397 return (bytesWritten > 0);
\r
400 void AsyncEventServer::DoSend(SocketInfo& socketInfo) {
\r
401 //Locks the socketInfo-object so that no one else tampers with the sendqueue at the same time
\r
402 tbb::mutex::scoped_lock lock(mutex_);
\r
404 while(!socketInfo.sendQueue_.empty() || socketInfo.currentlySending_.size() > 0) {
\r
405 if(socketInfo.currentlySending_.size() == 0) {
\r
406 //Read the next string in the queue and convert to UTF-8
\r
407 if(!ConvertWideCharToMultiByte(pProtocolStrategy_->GetCodepage(), socketInfo.sendQueue_.front(), socketInfo.currentlySending_))
\r
409 CASPAR_LOG(error) << "Send to " << socketInfo.host_.c_str() << TEXT(" failed, could not convert response to UTF-8");
\r
411 socketInfo.currentlySendingOffset_ = 0;
\r
414 if(socketInfo.currentlySending_.size() > 0) {
\r
415 int bytesToSend = static_cast<int>(socketInfo.currentlySending_.size()-socketInfo.currentlySendingOffset_);
\r
416 int sentBytes = send(socketInfo.socket_, &socketInfo.currentlySending_[0] + socketInfo.currentlySendingOffset_, bytesToSend, 0);
\r
417 if(sentBytes == SOCKET_ERROR) {
\r
418 int errorCode = WSAGetLastError();
\r
419 if(errorCode == WSAEWOULDBLOCK) {
\r
420 CASPAR_LOG(debug) << "Send to " << socketInfo.host_.c_str() << TEXT(" would block, sending later");
\r
424 LogSocketError(TEXT("Send"), errorCode);
\r
425 OnError(socketInfo.event_, errorCode);
\r
427 socketInfo.currentlySending_.resize(0);
\r
428 socketInfo.currentlySendingOffset_ = 0;
\r
429 socketInfo.sendQueue_.pop();
\r
434 if(sentBytes == bytesToSend) {
\r
435 if(sentBytes < 200)
\r
436 CASPAR_LOG(info) << "Sent " << socketInfo.sendQueue_.front().c_str() << TEXT(" to ") << socketInfo.host_.c_str();
\r
438 CASPAR_LOG(info) << "Sent more than 200 bytes to " << socketInfo.host_.c_str();
\r
440 socketInfo.currentlySending_.resize(0);
\r
441 socketInfo.currentlySendingOffset_ = 0;
\r
442 socketInfo.sendQueue_.pop();
\r
445 socketInfo.currentlySendingOffset_ += sentBytes;
\r
446 CASPAR_LOG(info) << "Sent partial message to " << socketInfo.host_.c_str();
\r
451 socketInfo.sendQueue_.pop();
\r
455 //////////////////////////////
\r
456 // AsyncEventServer::OnClose
\r
458 // COMMENT: Called when a client disconnects / is disconnected
\r
459 void AsyncEventServer::OnClose(SocketInfoPtr& pSI) {
\r
460 CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" was disconnected");
\r
462 socketInfoCollection_.RemoveSocketInfo(pSI);
\r
465 //////////////////////////////
\r
466 // AsyncEventServer::OnError
\r
468 // COMMENT: Called when an errorcode is recieved
\r
469 void AsyncEventServer::OnError(HANDLE waitEvent, int errorCode) {
\r
470 if(errorCode == WSAENETDOWN || errorCode == WSAECONNABORTED || errorCode == WSAECONNRESET || errorCode == WSAESHUTDOWN || errorCode == WSAETIMEDOUT || errorCode == WSAENOTCONN || errorCode == WSAENETRESET) {
\r
471 SocketInfoPtr pSocketInfo;
\r
472 if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {
\r
473 CASPAR_LOG(info) << "Client " << pSocketInfo->host_.c_str() << TEXT(" was disconnected, Errorcode ") << errorCode;
\r
476 socketInfoCollection_.RemoveSocketInfo(waitEvent);
\r
480 //////////////////////////////
\r
481 // AsyncEventServer::DisconnectClient
\r
483 // COMMENT: The client is removed from the actual client-list when an FD_CLOSE notification is recieved
\r
484 void AsyncEventServer::DisconnectClient(SocketInfo& socketInfo) {
\r
485 int result = shutdown(socketInfo.socket_, SD_SEND);
\r
486 if(result == SOCKET_ERROR)
\r
487 OnError(socketInfo.event_, result);
\r
490 //////////////////////////////
\r
491 // AsyncEventServer::LogSocketError
\r
492 void AsyncEventServer::LogSocketError(const TCHAR* pStr, int socketError) {
\r
493 if(socketError == 0)
\r
494 socketError = WSAGetLastError();
\r
496 CASPAR_LOG(error) << "Failed to " << pStr << TEXT(" Errorcode: ") << socketError;
\r
500 //////////////////////////////
\r
501 // SocketInfoCollection
\r
502 //////////////////////////////
\r
504 AsyncEventServer::SocketInfoCollection::SocketInfoCollection() : bDirty_(false) {
\r
507 AsyncEventServer::SocketInfoCollection::~SocketInfoCollection() {
\r
510 bool AsyncEventServer::SocketInfoCollection::AddSocketInfo(SocketInfoPtr& pSocketInfo) {
\r
511 tbb::mutex::scoped_lock lock(mutex_);
\r
513 waitEvents_.resize(waitEvents_.size()+1);
\r
514 bool bSuccess = socketInfoMap_.insert(SocketInfoMap::value_type(pSocketInfo->event_, pSocketInfo)).second;
\r
516 waitEvents_[waitEvents_.size()-1] = pSocketInfo->event_;
\r
523 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(SocketInfoPtr& pSocketInfo) {
\r
524 if(pSocketInfo != 0) {
\r
525 RemoveSocketInfo(pSocketInfo->event_);
\r
528 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(HANDLE waitEvent) {
\r
529 tbb::mutex::scoped_lock lock(mutex_);
\r
532 SocketInfoPtr pSocketInfo;
\r
533 SocketInfoMap::iterator it = socketInfoMap_.find(waitEvent);
\r
534 SocketInfoMap::iterator end = socketInfoMap_.end();
\r
536 pSocketInfo = it->second;
\r
539 pSocketInfo->pServer_ = NULL;
\r
541 socketInfoMap_.erase(waitEvent);
\r
543 HandleVector::iterator it = std::find(waitEvents_.begin(), waitEvents_.end(), waitEvent);
\r
544 if(it != waitEvents_.end()) {
\r
545 std::swap((*it), waitEvents_.back());
\r
546 waitEvents_.resize(waitEvents_.size()-1);
\r
551 if(onSocketInfoRemoved)
\r
552 onSocketInfoRemoved(pSocketInfo);
\r
555 bool AsyncEventServer::SocketInfoCollection::FindSocketInfo(HANDLE key, SocketInfoPtr& pResult) {
\r
556 tbb::mutex::scoped_lock lock(mutex_);
\r
558 SocketInfoMap::iterator it = socketInfoMap_.find(key);
\r
559 SocketInfoMap::iterator end = socketInfoMap_.end();
\r
561 pResult = it->second;
\r
563 return (it != end);
\r
566 void AsyncEventServer::SocketInfoCollection::CopyCollectionToArray(HANDLE* pDest, int maxCount) {
\r
567 tbb::mutex::scoped_lock lock(mutex_);
\r
569 memcpy(pDest, &(waitEvents_[0]), std::min( maxCount, static_cast<int>(waitEvents_.size()) ) * sizeof(HANDLE) );
\r
572 void AsyncEventServer::SocketInfoCollection::Clear() {
\r
573 tbb::mutex::scoped_lock lock(mutex_);
\r
575 socketInfoMap_.clear();
\r
576 waitEvents_.clear();
\r