]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
13993fe0a5626f106065665c3ccf749f9f1e6730
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Nicklas P Andersson\r
20 */\r
21 \r
22  \r
23 // AsyncEventServer.cpp: impl of the AsyncEventServer class.\r
24 //\r
25 //////////////////////////////////////////////////////////////////////\r
26 \r
27 #include "../stdafx.h"\r
28 \r
29 #include "AsyncEventServer.h"\r
30 #include "SocketInfo.h"\r
31 \r
32 #include <common/log/log.h>\r
33 \r
34 #include <boost/algorithm/string/replace.hpp>\r
35 \r
36 #include <string>\r
37 #include <algorithm>\r
38 \r
39 #if defined(_MSC_VER)\r
40 #pragma warning (push, 1) // TODO: Legacy code, just disable warnings, will replace with boost::asio in future\r
41 #endif\r
42 \r
43 namespace caspar { namespace IO {\r
44         \r
45 #define CASPAR_MAXIMUM_SOCKET_CLIENTS   (MAXIMUM_WAIT_OBJECTS-1)        \r
46 \r
47 long AsyncEventServer::instanceCount_ = 0;\r
48 //////////////////////////////\r
49 // AsyncEventServer constructor\r
50 // PARAMS: port(TCP-port the server should listen to)\r
51 // COMMENT: Initializes the WinSock2 library\r
52 AsyncEventServer::AsyncEventServer(const safe_ptr<IProtocolStrategy>& pProtocol, int port) : port_(port), pProtocolStrategy_(pProtocol)\r
53 {\r
54         if(instanceCount_ == 0) {\r
55                 WSADATA wsaData;\r
56                 if(WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR)\r
57                         throw std::exception("Error initializing WinSock2");\r
58                 else {\r
59                         CASPAR_LOG(info) << "WinSock2 Initialized.";\r
60                 }\r
61         }\r
62 \r
63         InterlockedIncrement(&instanceCount_);\r
64 }\r
65 \r
66 /////////////////////////////\r
67 // AsyncEventServer destructor\r
68 AsyncEventServer::~AsyncEventServer() {\r
69         Stop();\r
70 \r
71         InterlockedDecrement(&instanceCount_);\r
72         if(instanceCount_ == 0)\r
73                 WSACleanup();\r
74 }\r
75 \r
76 void AsyncEventServer::SetClientDisconnectHandler(ClientDisconnectEvent handler) {\r
77         socketInfoCollection_.onSocketInfoRemoved = handler;\r
78 }\r
79 \r
80 //////////////////////////////\r
81 // AsyncEventServer::Start\r
82 // RETURNS: true at successful startup\r
83 bool AsyncEventServer::Start() {\r
84         if(listenThread_.IsRunning())\r
85                 return false;\r
86 \r
87         socketInfoCollection_.Clear();\r
88 \r
89         sockaddr_in sockAddr;\r
90         ZeroMemory(&sockAddr, sizeof(sockAddr));\r
91         sockAddr.sin_family = AF_INET;\r
92         sockAddr.sin_addr.s_addr = INADDR_ANY;\r
93         sockAddr.sin_port = htons(port_);\r
94         \r
95         SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);\r
96         if(listenSocket == INVALID_SOCKET) {\r
97                 CASPAR_LOG(error) << "Failed to create listenSocket";\r
98                 return false;\r
99         }\r
100         \r
101         pListenSocketInfo_ = SocketInfoPtr(new SocketInfo(listenSocket, this));\r
102 \r
103         if(WSAEventSelect(pListenSocketInfo_->socket_, pListenSocketInfo_->event_, FD_ACCEPT|FD_CLOSE) == SOCKET_ERROR) {\r
104                 CASPAR_LOG(error) << "Failed to enter EventSelect-mode for listenSocket";\r
105                 return false;\r
106         }\r
107 \r
108         if(bind(pListenSocketInfo_->socket_, (sockaddr*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) {\r
109                 CASPAR_LOG(error) << "Failed to bind listenSocket";\r
110                 return false;\r
111         }\r
112 \r
113         if(listen(pListenSocketInfo_->socket_, SOMAXCONN) == SOCKET_ERROR) {\r
114                 CASPAR_LOG(error) << "Failed to listen";\r
115                 return false;\r
116         }\r
117 \r
118         socketInfoCollection_.AddSocketInfo(pListenSocketInfo_);\r
119 \r
120         //start thread: the entrypoint is Run(EVENT stopEvent)\r
121         if(!listenThread_.Start(this)) {\r
122                 CASPAR_LOG(error) << "Failed to create ListenThread";\r
123                 return false;\r
124         }\r
125 \r
126         CASPAR_LOG(info) << "Listener successfully initialized";\r
127         return true;\r
128 }\r
129 \r
130 void AsyncEventServer::Run(HANDLE stopEvent)\r
131 {\r
132         WSANETWORKEVENTS networkEvents;\r
133 \r
134         HANDLE waitHandlesCopy[MAXIMUM_WAIT_OBJECTS];\r
135         waitHandlesCopy[0] = stopEvent;\r
136 \r
137         while(true)     {\r
138                 //Update local copy of the array of wait-handles if nessecery\r
139                 if(socketInfoCollection_.IsDirty()) {\r
140                         socketInfoCollection_.CopyCollectionToArray(&(waitHandlesCopy[1]), CASPAR_MAXIMUM_SOCKET_CLIENTS);\r
141                         socketInfoCollection_.ClearDirty();\r
142                 }\r
143 \r
144                 DWORD waitResult = WSAWaitForMultipleEvents(std::min<DWORD>(static_cast<DWORD>(socketInfoCollection_.Size()+1), MAXIMUM_WAIT_OBJECTS), waitHandlesCopy, FALSE, 1500, FALSE);\r
145                 if(waitResult == WAIT_TIMEOUT)\r
146                         continue;\r
147                 else if(waitResult == WAIT_FAILED)\r
148                         break;\r
149                 else {\r
150                         DWORD eventIndex = waitResult - WAIT_OBJECT_0;\r
151 \r
152                         HANDLE waitEvent = waitHandlesCopy[eventIndex];\r
153                         SocketInfoPtr pSocketInfo;\r
154 \r
155                         if(eventIndex == 0)     //stopEvent\r
156                                 break;\r
157                         else if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {\r
158                                 WSAEnumNetworkEvents(pSocketInfo->socket_, waitEvent, &networkEvents);\r
159 \r
160                                 if(networkEvents.lNetworkEvents & FD_ACCEPT) {\r
161                                         if(networkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)\r
162                                                 OnAccept(pSocketInfo);\r
163                                         else {\r
164                                                 CASPAR_LOG(debug) << "OnAccept (ErrorCode: " << networkEvents.iErrorCode[FD_ACCEPT_BIT] << TEXT(")");\r
165                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_ACCEPT_BIT]);\r
166                                         }\r
167                                 }\r
168 \r
169                                 if(networkEvents.lNetworkEvents & FD_CLOSE) {\r
170                                         if(networkEvents.iErrorCode[FD_CLOSE_BIT] == 0)\r
171                                                 OnClose(pSocketInfo);\r
172                                         else {\r
173                                                 CASPAR_LOG(debug) << "OnClose (ErrorCode: " << networkEvents.iErrorCode[FD_CLOSE_BIT] << TEXT(")");\r
174                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_CLOSE_BIT]);\r
175                                         }\r
176                                         continue;\r
177                                 }\r
178 \r
179                                 if(networkEvents.lNetworkEvents & FD_READ) {\r
180                                         if(networkEvents.iErrorCode[FD_READ_BIT] == 0)\r
181                                                 OnRead(pSocketInfo);\r
182                                         else {\r
183                                                 CASPAR_LOG(debug) << "OnRead (ErrorCode: " << networkEvents.iErrorCode[FD_READ_BIT] << TEXT(")");\r
184                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_READ_BIT]);\r
185                                         }\r
186                                 }\r
187 \r
188                                 if(networkEvents.lNetworkEvents & FD_WRITE) {\r
189                                         if(networkEvents.iErrorCode[FD_WRITE_BIT] == 0)\r
190                                                 OnWrite(pSocketInfo);\r
191                                         else {\r
192                                                 CASPAR_LOG(debug) << "OnWrite (ErrorCode: " << networkEvents.iErrorCode[FD_WRITE_BIT] << TEXT(")");\r
193                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_WRITE_BIT]);\r
194                                         }\r
195                                 }\r
196                         }\r
197                         else {\r
198                                 //Could not find the waitHandle in the SocketInfoCollection.\r
199                                 //It must have been removed during the last call to WSAWaitForMultipleEvents\r
200                         }\r
201                 }\r
202         }\r
203 }\r
204 \r
205 bool AsyncEventServer::OnUnhandledException(const std::exception& ex) throw() {\r
206         bool bDoRestart = true;\r
207 \r
208         try \r
209         {\r
210                 CASPAR_LOG(fatal) << "UNHANDLED EXCEPTION in TCPServers listeningthread. Message: " << ex.what();\r
211         }\r
212         catch(...)\r
213         {\r
214                 bDoRestart = false;\r
215         }\r
216 \r
217         return bDoRestart;\r
218 }\r
219 \r
220 ///////////////////////////////\r
221 // AsyncEventServer:Stop\r
222 // COMMENT: Shuts down\r
223 void AsyncEventServer::Stop()\r
224 {\r
225         //TODO: initiate shutdown on all clients connected\r
226 //      for(int i=0; i < _totalActiveSockets; ++i) {\r
227 //              shutdown(_pSocketInfo[i]->_socket, SD_SEND);\r
228 //      }\r
229 \r
230         if(!listenThread_.Stop()) {\r
231                 CASPAR_LOG(warning) << "Wait for listenThread timed out.";\r
232         }\r
233 \r
234         socketInfoCollection_.Clear();\r
235 }\r
236 \r
237 ////////////////////////////////////////////////////////////////////\r
238 //\r
239 // MESSAGE HANDLERS   \r
240 //\r
241 ////////////////////////////////////////////////////////////////////\r
242 \r
243 \r
244 //////////////////////////////\r
245 // AsyncEventServer::OnAccept\r
246 // PARAMS: ...\r
247 // COMMENT: Called when a new client connects\r
248 bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) {\r
249         sockaddr_in     clientAddr;\r
250         int addrSize = sizeof(clientAddr);\r
251         SOCKET clientSocket = WSAAccept(pSI->socket_, (sockaddr*)&clientAddr, &addrSize, NULL, NULL);\r
252         if(clientSocket == INVALID_SOCKET) {\r
253                 LogSocketError(TEXT("Accept"));\r
254                 return false;\r
255         }\r
256 \r
257         SocketInfoPtr pClientSocket(new SocketInfo(clientSocket, this));\r
258 \r
259         //Determine if we can handle one more client\r
260         if(socketInfoCollection_.Size() >= CASPAR_MAXIMUM_SOCKET_CLIENTS) {\r
261                 CASPAR_LOG(error) << "Could not accept ) << too many connections).";\r
262                 return true;\r
263         }\r
264 \r
265         if(WSAEventSelect(pClientSocket->socket_, pClientSocket->event_, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) {\r
266                 LogSocketError(TEXT("Accept (failed create event for new client)"));\r
267                 return false;\r
268         }\r
269 \r
270         TCHAR addressBuffer[32];\r
271         MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32);\r
272         pClientSocket->host_ = addressBuffer;\r
273 \r
274         socketInfoCollection_.AddSocketInfo(pClientSocket);\r
275 \r
276         CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str();\r
277 \r
278         return true;\r
279 }\r
280 \r
281 bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)\r
282 {\r
283         if(codePage == CP_UTF8) {\r
284                 countLeftovers = 0;\r
285                 //check from the end of pSource for ev. uncompleted UTF-8 byte sequence\r
286                 if(pSource[sourceLength-1] & 0x80) {\r
287                         //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence\r
288                         int bytesToCheck = std::min(4, sourceLength);   //a sequence contains a maximum of 4 bytes\r
289                         int currentLeftoverIndex = sourceLength-1;\r
290                         for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {\r
291                                 ++countLeftovers;\r
292                                 if(pSource[currentLeftoverIndex] & 0x80) {\r
293                                         if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"\r
294                                                 int expectedSequenceLength = 2;\r
295                                                 if(pSource[currentLeftoverIndex] & 0x20)\r
296                                                         ++expectedSequenceLength;\r
297                                                 if(pSource[currentLeftoverIndex] & 0x10)\r
298                                                         ++expectedSequenceLength;\r
299 \r
300                                                 if(countLeftovers < expectedSequenceLength) {\r
301                                                         //The sequence is incomplete. Leave the leftovers to be interpreted with the next call\r
302                                                         break;\r
303                                                 }\r
304                                                 //The sequence is complete, there are no leftovers. \r
305                                                 //...OR...\r
306                                                 //error. Let the conversion-function take the hit.\r
307                                                 countLeftovers = 0;\r
308                                                 break;\r
309                                         }\r
310                                 }\r
311                                 else {\r
312                                         //error. Let the conversion-function take the hit.\r
313                                         countLeftovers = 0;\r
314                                         break;\r
315                                 }\r
316                         }\r
317                         if(countLeftovers == 4) {\r
318                                 //error. Let the conversion-function take the hit.\r
319                                 countLeftovers = 0;\r
320                         }\r
321                 }\r
322         }\r
323 \r
324         int charsWritten = 0;\r
325         int sourceBytesToProcess = sourceLength-countLeftovers;\r
326         int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);\r
327         if(wideBufferCapacity > 0) \r
328         {\r
329                 wideBuffer.resize(wideBufferCapacity);\r
330                 charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, &wideBuffer[0], wideBuffer.size());\r
331         }\r
332         //copy the leftovers to the front of the buffer\r
333         if(countLeftovers > 0) {\r
334                 memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);\r
335         }\r
336 \r
337         wideBuffer.resize(charsWritten);\r
338         return (charsWritten > 0);\r
339 }\r
340 \r
341 //////////////////////////////\r
342 // AsyncEventServer::OnRead\r
343 // PARAMS: ...\r
344 // COMMENT: Called then something arrives on the socket that has to be read\r
345 bool AsyncEventServer::OnRead(SocketInfoPtr& pSI) {\r
346         int recvResult = SOCKET_ERROR;\r
347 \r
348         int maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;\r
349         recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);\r
350         while(recvResult != SOCKET_ERROR) {\r
351                 if(recvResult == 0) {\r
352                         CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" disconnected");\r
353 \r
354                         socketInfoCollection_.RemoveSocketInfo(pSI);\r
355                         return true;\r
356                 }\r
357 \r
358                 //Convert to widechar\r
359                 if(ConvertMultiByteToWideChar(pProtocolStrategy_->GetCodepage(), pSI->recvBuffer_, recvResult + pSI->recvLeftoverOffset_, pSI->wideRecvBuffer_, pSI->recvLeftoverOffset_))\r
360                 {\r
361                         auto msg =      std::wstring(pSI->wideRecvBuffer_.begin(), pSI->wideRecvBuffer_.end());\r
362                         boost::replace_all(msg, L"\n", L"\\n");\r
363                         boost::replace_all(msg, L"\r", L"\\r");\r
364 \r
365                         CASPAR_LOG(info) << L"Received message from " << pSI->host_.c_str() << ": "<< msg;\r
366                         pProtocolStrategy_->Parse(&pSI->wideRecvBuffer_[0], pSI->wideRecvBuffer_.size(), pSI);\r
367                 }\r
368                 else                    \r
369                         CASPAR_LOG(error) << "Read from " << pSI->host_.c_str() << TEXT(" failed, could not convert command to UNICODE");\r
370                         \r
371                 maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;\r
372                 recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);\r
373         }\r
374 \r
375         if(recvResult == SOCKET_ERROR) {\r
376                 int errorCode = WSAGetLastError();\r
377                 if(errorCode == WSAEWOULDBLOCK)\r
378                         return true;\r
379                 else {\r
380                         LogSocketError(TEXT("Read"), errorCode);\r
381                         OnError(pSI->event_, errorCode);\r
382                 }\r
383         }\r
384 \r
385         return false;\r
386 }\r
387 \r
388 //////////////////////////////\r
389 // AsyncEventServer::OnWrite\r
390 // PARAMS: ...\r
391 // COMMENT: Called when the socket is ready to send more data\r
392 void AsyncEventServer::OnWrite(SocketInfoPtr& pSI) {\r
393         DoSend(*pSI);   \r
394 }\r
395 \r
396 bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)\r
397 {\r
398         int bytesWritten = 0;\r
399         int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);\r
400         if(multibyteBufferCapacity > 0) \r
401         {\r
402                 destBuffer.resize(multibyteBufferCapacity);\r
403                 bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), &destBuffer[0], destBuffer.size(), NULL, NULL);\r
404         }\r
405         destBuffer.resize(bytesWritten);\r
406         return (bytesWritten > 0);\r
407 }\r
408 \r
409 void AsyncEventServer::DoSend(SocketInfo& socketInfo) {\r
410         //Locks the socketInfo-object so that no one else tampers with the sendqueue at the same time\r
411         tbb::mutex::scoped_lock lock(mutex_);\r
412 \r
413         while(!socketInfo.sendQueue_.empty() || socketInfo.currentlySending_.size() > 0) {\r
414                 if(socketInfo.currentlySending_.size() == 0) {\r
415                         //Read the next string in the queue and convert to UTF-8\r
416                         if(!ConvertWideCharToMultiByte(pProtocolStrategy_->GetCodepage(), socketInfo.sendQueue_.front(), socketInfo.currentlySending_))\r
417                         {\r
418                                 CASPAR_LOG(error) << "Send to " << socketInfo.host_.c_str() << TEXT(" failed, could not convert response to UTF-8");\r
419                         }\r
420                         socketInfo.currentlySendingOffset_ = 0;\r
421                 }\r
422 \r
423                 if(socketInfo.currentlySending_.size() > 0) {\r
424                         int bytesToSend = static_cast<int>(socketInfo.currentlySending_.size()-socketInfo.currentlySendingOffset_);\r
425                         int sentBytes = send(socketInfo.socket_, &socketInfo.currentlySending_[0] + socketInfo.currentlySendingOffset_, bytesToSend, 0);\r
426                         if(sentBytes == SOCKET_ERROR) {\r
427                                 int errorCode = WSAGetLastError();\r
428                                 if(errorCode == WSAEWOULDBLOCK) {\r
429                                         CASPAR_LOG(debug) << "Send to " << socketInfo.host_.c_str() << TEXT(" would block, sending later");\r
430                                         break;\r
431                                 }\r
432                                 else {\r
433                                         LogSocketError(TEXT("Send"), errorCode);\r
434                                         OnError(socketInfo.event_, errorCode);\r
435 \r
436                                         socketInfo.currentlySending_.resize(0);\r
437                                         socketInfo.currentlySendingOffset_ = 0;\r
438                                         socketInfo.sendQueue_.pop();\r
439                                         break;\r
440                                 }\r
441                         }\r
442                         else {\r
443                                 if(sentBytes == bytesToSend) {\r
444                                         \r
445                                         if(sentBytes < 512)\r
446                                         {\r
447                                                 boost::replace_all(socketInfo.sendQueue_.front(), L"\n", L"\\n");\r
448                                                 boost::replace_all(socketInfo.sendQueue_.front(), L"\r", L"\\r");\r
449                                                 CASPAR_LOG(info) << L"Sent message to " << socketInfo.host_.c_str() << L": " << socketInfo.sendQueue_.front().c_str();\r
450                                         }\r
451                                         else\r
452                                                 CASPAR_LOG(info) << "Sent more than 512 bytes to " << socketInfo.host_.c_str();\r
453 \r
454                                         socketInfo.currentlySending_.resize(0);\r
455                                         socketInfo.currentlySendingOffset_ = 0;\r
456                                         socketInfo.sendQueue_.pop();\r
457                                 }\r
458                                 else {\r
459                                         socketInfo.currentlySendingOffset_ += sentBytes;\r
460                                         CASPAR_LOG(info) << "Sent partial message to " << socketInfo.host_.c_str();\r
461                                 }\r
462                         }\r
463                 }\r
464                 else\r
465                         socketInfo.sendQueue_.pop();\r
466         }\r
467 }\r
468 \r
469 //////////////////////////////\r
470 // AsyncEventServer::OnClose\r
471 // PARAMS: ...\r
472 // COMMENT: Called when a client disconnects / is disconnected\r
473 void AsyncEventServer::OnClose(SocketInfoPtr& pSI) {\r
474         CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" was disconnected");\r
475 \r
476         socketInfoCollection_.RemoveSocketInfo(pSI);\r
477 }\r
478 \r
479 //////////////////////////////\r
480 // AsyncEventServer::OnError\r
481 // PARAMS: ...\r
482 // COMMENT: Called when an errorcode is recieved\r
483 void AsyncEventServer::OnError(HANDLE waitEvent, int errorCode) {\r
484         if(errorCode == WSAENETDOWN || errorCode == WSAECONNABORTED || errorCode == WSAECONNRESET || errorCode == WSAESHUTDOWN || errorCode == WSAETIMEDOUT || errorCode == WSAENOTCONN || errorCode == WSAENETRESET) {\r
485                 SocketInfoPtr pSocketInfo;\r
486                 if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {\r
487                         CASPAR_LOG(info) << "Client " << pSocketInfo->host_.c_str() << TEXT(" was disconnected, Errorcode ") << errorCode;\r
488                 }\r
489 \r
490                 socketInfoCollection_.RemoveSocketInfo(waitEvent);\r
491         }\r
492 }\r
493 \r
494 //////////////////////////////\r
495 // AsyncEventServer::DisconnectClient\r
496 // PARAMS: ...\r
497 // COMMENT: The client is removed from the actual client-list when an FD_CLOSE notification is recieved\r
498 void AsyncEventServer::DisconnectClient(SocketInfo& socketInfo) {\r
499         int result = shutdown(socketInfo.socket_, SD_SEND);\r
500         if(result == SOCKET_ERROR)\r
501                 OnError(socketInfo.event_, result);\r
502 }\r
503 \r
504 //////////////////////////////\r
505 // AsyncEventServer::LogSocketError\r
506 void AsyncEventServer::LogSocketError(const TCHAR* pStr, int socketError) {\r
507         if(socketError == 0)\r
508                 socketError = WSAGetLastError();\r
509 \r
510         CASPAR_LOG(error) << "Failed to " << pStr << TEXT(" Errorcode: ") << socketError;\r
511 }\r
512 \r
513 \r
514 //////////////////////////////\r
515 //  SocketInfoCollection\r
516 //////////////////////////////\r
517 \r
518 AsyncEventServer::SocketInfoCollection::SocketInfoCollection() : bDirty_(false) {\r
519 }\r
520 \r
521 AsyncEventServer::SocketInfoCollection::~SocketInfoCollection() {\r
522 }\r
523 \r
524 bool AsyncEventServer::SocketInfoCollection::AddSocketInfo(SocketInfoPtr& pSocketInfo) {\r
525         tbb::mutex::scoped_lock lock(mutex_);\r
526 \r
527         waitEvents_.resize(waitEvents_.size()+1);\r
528         bool bSuccess = socketInfoMap_.insert(SocketInfoMap::value_type(pSocketInfo->event_, pSocketInfo)).second;\r
529         if(bSuccess) {\r
530                 waitEvents_[waitEvents_.size()-1] = pSocketInfo->event_;\r
531                 bDirty_ = true;\r
532         }\r
533 \r
534         return bSuccess;\r
535 }\r
536 \r
537 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(SocketInfoPtr& pSocketInfo) {\r
538         if(pSocketInfo != 0) {\r
539                 RemoveSocketInfo(pSocketInfo->event_);\r
540         }\r
541 }\r
542 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(HANDLE waitEvent) {\r
543         tbb::mutex::scoped_lock lock(mutex_);\r
544 \r
545         //Find instance\r
546         SocketInfoPtr pSocketInfo;\r
547         SocketInfoMap::iterator it = socketInfoMap_.find(waitEvent);\r
548         SocketInfoMap::iterator end = socketInfoMap_.end();\r
549         if(it != end)\r
550                 pSocketInfo = it->second;\r
551 \r
552         if(pSocketInfo) {\r
553                 pSocketInfo->pServer_ = NULL;\r
554 \r
555                 socketInfoMap_.erase(waitEvent);\r
556 \r
557                 HandleVector::iterator it = std::find(waitEvents_.begin(), waitEvents_.end(), waitEvent);\r
558                 if(it != waitEvents_.end()) {\r
559                         std::swap((*it), waitEvents_.back());\r
560                         waitEvents_.resize(waitEvents_.size()-1);\r
561 \r
562                         bDirty_ = true;\r
563                 }\r
564         }\r
565         if(onSocketInfoRemoved)\r
566                 onSocketInfoRemoved(pSocketInfo);\r
567 }\r
568 \r
569 bool AsyncEventServer::SocketInfoCollection::FindSocketInfo(HANDLE key, SocketInfoPtr& pResult) {\r
570         tbb::mutex::scoped_lock lock(mutex_);\r
571 \r
572         SocketInfoMap::iterator it = socketInfoMap_.find(key);\r
573         SocketInfoMap::iterator end = socketInfoMap_.end();\r
574         if(it != end)\r
575                 pResult = it->second;\r
576 \r
577         return (it != end);\r
578 }\r
579 \r
580 void AsyncEventServer::SocketInfoCollection::CopyCollectionToArray(HANDLE* pDest, int maxCount) {\r
581         tbb::mutex::scoped_lock lock(mutex_);\r
582 \r
583         memcpy(pDest, &(waitEvents_[0]), std::min( maxCount, static_cast<int>(waitEvents_.size()) ) * sizeof(HANDLE) );\r
584 }\r
585 \r
586 void AsyncEventServer::SocketInfoCollection::Clear() {\r
587         tbb::mutex::scoped_lock lock(mutex_);\r
588 \r
589         socketInfoMap_.clear();\r
590         waitEvents_.clear();\r
591 }\r
592 \r
593 }       //namespace IO\r
594 }       //namespace caspar