]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
2.1.0: Some mayor refactoring.
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Nicklas P Andersson\r
20 */\r
21 \r
22  \r
23 // AsyncEventServer.cpp: impl of the AsyncEventServer class.\r
24 //\r
25 //////////////////////////////////////////////////////////////////////\r
26 \r
27 #include "../stdafx.h"\r
28 \r
29 #include "AsyncEventServer.h"\r
30 #include "SocketInfo.h"\r
31 \r
32 #include <common/log.h>\r
33 #include <string>\r
34 #include <algorithm>\r
35 #include <boost/algorithm/string/replace.hpp>\r
36 \r
37 #if defined(_MSC_VER)\r
38 #pragma warning (push, 1) // TODO: Legacy code, just disable warnings, will replace with boost::asio in future\r
39 #endif\r
40 \r
41 namespace caspar { namespace IO {\r
42         \r
43 #define CASPAR_MAXIMUM_SOCKET_CLIENTS   (MAXIMUM_WAIT_OBJECTS-1)        \r
44 \r
45 long AsyncEventServer::instanceCount_ = 0;\r
46 //////////////////////////////\r
47 // AsyncEventServer constructor\r
48 // PARAMS: port(TCP-port the server should listen to)\r
49 // COMMENT: Initializes the WinSock2 library\r
50 AsyncEventServer::AsyncEventServer(const safe_ptr<IProtocolStrategy>& pProtocol, int port) : port_(port), pProtocolStrategy_(pProtocol)\r
51 {\r
52         if(instanceCount_ == 0) {\r
53                 WSADATA wsaData;\r
54                 if(WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR)\r
55                         throw std::exception("Error initializing WinSock2");\r
56                 else {\r
57                         CASPAR_LOG(info) << "WinSock2 Initialized.";\r
58                 }\r
59         }\r
60 \r
61         InterlockedIncrement(&instanceCount_);\r
62 }\r
63 \r
64 /////////////////////////////\r
65 // AsyncEventServer destructor\r
66 AsyncEventServer::~AsyncEventServer() {\r
67         Stop();\r
68 \r
69         InterlockedDecrement(&instanceCount_);\r
70         if(instanceCount_ == 0)\r
71                 WSACleanup();\r
72 }\r
73 \r
74 void AsyncEventServer::SetClientDisconnectHandler(ClientDisconnectEvent handler) {\r
75         socketInfoCollection_.onSocketInfoRemoved = handler;\r
76 }\r
77 \r
78 //////////////////////////////\r
79 // AsyncEventServer::Start\r
80 // RETURNS: true at successful startup\r
81 bool AsyncEventServer::Start() {\r
82         if(listenThread_.IsRunning())\r
83                 return false;\r
84 \r
85         socketInfoCollection_.Clear();\r
86 \r
87         sockaddr_in sockAddr;\r
88         ZeroMemory(&sockAddr, sizeof(sockAddr));\r
89         sockAddr.sin_family = AF_INET;\r
90         sockAddr.sin_addr.s_addr = INADDR_ANY;\r
91         sockAddr.sin_port = htons(port_);\r
92         \r
93         SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);\r
94         if(listenSocket == INVALID_SOCKET) {\r
95                 CASPAR_LOG(error) << "Failed to create listenSocket";\r
96                 return false;\r
97         }\r
98         \r
99         pListenSocketInfo_ = SocketInfoPtr(new SocketInfo(listenSocket, this));\r
100 \r
101         if(WSAEventSelect(pListenSocketInfo_->socket_, pListenSocketInfo_->event_, FD_ACCEPT|FD_CLOSE) == SOCKET_ERROR) {\r
102                 CASPAR_LOG(error) << "Failed to enter EventSelect-mode for listenSocket";\r
103                 return false;\r
104         }\r
105 \r
106         if(bind(pListenSocketInfo_->socket_, (sockaddr*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) {\r
107                 CASPAR_LOG(error) << "Failed to bind listenSocket";\r
108                 return false;\r
109         }\r
110 \r
111         if(listen(pListenSocketInfo_->socket_, SOMAXCONN) == SOCKET_ERROR) {\r
112                 CASPAR_LOG(error) << "Failed to listen";\r
113                 return false;\r
114         }\r
115 \r
116         socketInfoCollection_.AddSocketInfo(pListenSocketInfo_);\r
117 \r
118         //start thread: the entrypoint is Run(EVENT stopEvent)\r
119         if(!listenThread_.Start(this)) {\r
120                 CASPAR_LOG(error) << "Failed to create ListenThread";\r
121                 return false;\r
122         }\r
123 \r
124         CASPAR_LOG(info) << "Listener successfully initialized";\r
125         return true;\r
126 }\r
127 \r
128 void AsyncEventServer::Run(HANDLE stopEvent)\r
129 {\r
130         WSANETWORKEVENTS networkEvents;\r
131 \r
132         HANDLE waitHandlesCopy[MAXIMUM_WAIT_OBJECTS];\r
133         waitHandlesCopy[0] = stopEvent;\r
134 \r
135         while(true)     {\r
136                 //Update local copy of the array of wait-handles if nessecery\r
137                 if(socketInfoCollection_.IsDirty()) {\r
138                         socketInfoCollection_.CopyCollectionToArray(&(waitHandlesCopy[1]), CASPAR_MAXIMUM_SOCKET_CLIENTS);\r
139                         socketInfoCollection_.ClearDirty();\r
140                 }\r
141 \r
142                 DWORD waitResult = WSAWaitForMultipleEvents(std::min<DWORD>(static_cast<DWORD>(socketInfoCollection_.Size()+1), MAXIMUM_WAIT_OBJECTS), waitHandlesCopy, FALSE, 1500, FALSE);\r
143                 if(waitResult == WAIT_TIMEOUT)\r
144                         continue;\r
145                 else if(waitResult == WAIT_FAILED)\r
146                         break;\r
147                 else {\r
148                         DWORD eventIndex = waitResult - WAIT_OBJECT_0;\r
149 \r
150                         HANDLE waitEvent = waitHandlesCopy[eventIndex];\r
151                         SocketInfoPtr pSocketInfo;\r
152 \r
153                         if(eventIndex == 0)     //stopEvent\r
154                                 break;\r
155                         else if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {\r
156                                 WSAEnumNetworkEvents(pSocketInfo->socket_, waitEvent, &networkEvents);\r
157 \r
158                                 if(networkEvents.lNetworkEvents & FD_ACCEPT) {\r
159                                         if(networkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)\r
160                                                 OnAccept(pSocketInfo);\r
161                                         else {\r
162                                                 CASPAR_LOG(debug) << "OnAccept (ErrorCode: " << networkEvents.iErrorCode[FD_ACCEPT_BIT] << TEXT(")");\r
163                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_ACCEPT_BIT]);\r
164                                         }\r
165                                 }\r
166 \r
167                                 if(networkEvents.lNetworkEvents & FD_CLOSE) {\r
168                                         if(networkEvents.iErrorCode[FD_CLOSE_BIT] == 0)\r
169                                                 OnClose(pSocketInfo);\r
170                                         else {\r
171                                                 CASPAR_LOG(debug) << "OnClose (ErrorCode: " << networkEvents.iErrorCode[FD_CLOSE_BIT] << TEXT(")");\r
172                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_CLOSE_BIT]);\r
173                                         }\r
174                                         continue;\r
175                                 }\r
176 \r
177                                 if(networkEvents.lNetworkEvents & FD_READ) {\r
178                                         if(networkEvents.iErrorCode[FD_READ_BIT] == 0)\r
179                                                 OnRead(pSocketInfo);\r
180                                         else {\r
181                                                 CASPAR_LOG(debug) << "OnRead (ErrorCode: " << networkEvents.iErrorCode[FD_READ_BIT] << TEXT(")");\r
182                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_READ_BIT]);\r
183                                         }\r
184                                 }\r
185 \r
186                                 if(networkEvents.lNetworkEvents & FD_WRITE) {\r
187                                         if(networkEvents.iErrorCode[FD_WRITE_BIT] == 0)\r
188                                                 OnWrite(pSocketInfo);\r
189                                         else {\r
190                                                 CASPAR_LOG(debug) << "OnWrite (ErrorCode: " << networkEvents.iErrorCode[FD_WRITE_BIT] << TEXT(")");\r
191                                                 OnError(waitEvent, networkEvents.iErrorCode[FD_WRITE_BIT]);\r
192                                         }\r
193                                 }\r
194                         }\r
195                         else {\r
196                                 //Could not find the waitHandle in the SocketInfoCollection.\r
197                                 //It must have been removed during the last call to WSAWaitForMultipleEvents\r
198                         }\r
199                 }\r
200         }\r
201 }\r
202 \r
203 bool AsyncEventServer::OnUnhandledException(const std::exception& ex) throw() {\r
204         bool bDoRestart = true;\r
205 \r
206         try \r
207         {\r
208                 CASPAR_LOG(fatal) << "UNHANDLED EXCEPTION in TCPServers listeningthread. Message: " << ex.what();\r
209         }\r
210         catch(...)\r
211         {\r
212                 bDoRestart = false;\r
213         }\r
214 \r
215         return bDoRestart;\r
216 }\r
217 \r
218 ///////////////////////////////\r
219 // AsyncEventServer:Stop\r
220 // COMMENT: Shuts down\r
221 void AsyncEventServer::Stop()\r
222 {\r
223         //TODO: initiate shutdown on all clients connected\r
224 //      for(int i=0; i < _totalActiveSockets; ++i) {\r
225 //              shutdown(_pSocketInfo[i]->_socket, SD_SEND);\r
226 //      }\r
227 \r
228         if(!listenThread_.Stop()) {\r
229                 CASPAR_LOG(warning) << "Wait for listenThread timed out.";\r
230         }\r
231 \r
232         socketInfoCollection_.Clear();\r
233 }\r
234 \r
235 ////////////////////////////////////////////////////////////////////\r
236 //\r
237 // MESSAGE HANDLERS   \r
238 //\r
239 ////////////////////////////////////////////////////////////////////\r
240 \r
241 \r
242 //////////////////////////////\r
243 // AsyncEventServer::OnAccept\r
244 // PARAMS: ...\r
245 // COMMENT: Called when a new client connects\r
246 bool AsyncEventServer::OnAccept(SocketInfoPtr& pSI) {\r
247         sockaddr_in     clientAddr;\r
248         int addrSize = sizeof(clientAddr);\r
249         SOCKET clientSocket = WSAAccept(pSI->socket_, (sockaddr*)&clientAddr, &addrSize, NULL, NULL);\r
250         if(clientSocket == INVALID_SOCKET) {\r
251                 LogSocketError(TEXT("Accept"));\r
252                 return false;\r
253         }\r
254 \r
255         SocketInfoPtr pClientSocket(new SocketInfo(clientSocket, this));\r
256 \r
257         //Determine if we can handle one more client\r
258         if(socketInfoCollection_.Size() >= CASPAR_MAXIMUM_SOCKET_CLIENTS) {\r
259                 CASPAR_LOG(error) << "Could not accept ) << too many connections).";\r
260                 return true;\r
261         }\r
262 \r
263         if(WSAEventSelect(pClientSocket->socket_, pClientSocket->event_, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) {\r
264                 LogSocketError(TEXT("Accept (failed create event for new client)"));\r
265                 return false;\r
266         }\r
267 \r
268         TCHAR addressBuffer[32];\r
269         MultiByteToWideChar(CP_ACP, 0, inet_ntoa(clientAddr.sin_addr), -1, addressBuffer, 32);\r
270         pClientSocket->host_ = addressBuffer;\r
271 \r
272         socketInfoCollection_.AddSocketInfo(pClientSocket);\r
273 \r
274         CASPAR_LOG(info) << "Accepted connection from " << pClientSocket->host_.c_str();\r
275 \r
276         return true;\r
277 }\r
278 \r
279 bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)\r
280 {\r
281         if(codePage == CP_UTF8) {\r
282                 countLeftovers = 0;\r
283                 //check from the end of pSource for ev. uncompleted UTF-8 byte sequence\r
284                 if(pSource[sourceLength-1] & 0x80) {\r
285                         //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence\r
286                         int bytesToCheck = std::min(4, sourceLength);   //a sequence contains a maximum of 4 bytes\r
287                         int currentLeftoverIndex = sourceLength-1;\r
288                         for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {\r
289                                 ++countLeftovers;\r
290                                 if(pSource[currentLeftoverIndex] & 0x80) {\r
291                                         if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"\r
292                                                 int expectedSequenceLength = 2;\r
293                                                 if(pSource[currentLeftoverIndex] & 0x20)\r
294                                                         ++expectedSequenceLength;\r
295                                                 if(pSource[currentLeftoverIndex] & 0x10)\r
296                                                         ++expectedSequenceLength;\r
297 \r
298                                                 if(countLeftovers < expectedSequenceLength) {\r
299                                                         //The sequence is incomplete. Leave the leftovers to be interpreted with the next call\r
300                                                         break;\r
301                                                 }\r
302                                                 //The sequence is complete, there are no leftovers. \r
303                                                 //...OR...\r
304                                                 //error. Let the conversion-function take the hit.\r
305                                                 countLeftovers = 0;\r
306                                                 break;\r
307                                         }\r
308                                 }\r
309                                 else {\r
310                                         //error. Let the conversion-function take the hit.\r
311                                         countLeftovers = 0;\r
312                                         break;\r
313                                 }\r
314                         }\r
315                         if(countLeftovers == 4) {\r
316                                 //error. Let the conversion-function take the hit.\r
317                                 countLeftovers = 0;\r
318                         }\r
319                 }\r
320         }\r
321 \r
322         int charsWritten = 0;\r
323         int sourceBytesToProcess = sourceLength-countLeftovers;\r
324         int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);\r
325         if(wideBufferCapacity > 0) \r
326         {\r
327                 wideBuffer.resize(wideBufferCapacity);\r
328                 charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, &wideBuffer[0], wideBuffer.size());\r
329         }\r
330         //copy the leftovers to the front of the buffer\r
331         if(countLeftovers > 0) {\r
332                 memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);\r
333         }\r
334 \r
335         wideBuffer.resize(charsWritten);\r
336         return (charsWritten > 0);\r
337 }\r
338 \r
339 //////////////////////////////\r
340 // AsyncEventServer::OnRead\r
341 // PARAMS: ...\r
342 // COMMENT: Called then something arrives on the socket that has to be read\r
343 bool AsyncEventServer::OnRead(SocketInfoPtr& pSI) {\r
344         int recvResult = SOCKET_ERROR;\r
345 \r
346         int maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;\r
347         recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);\r
348         while(recvResult != SOCKET_ERROR) {\r
349                 if(recvResult == 0) {\r
350                         CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" disconnected");\r
351 \r
352                         socketInfoCollection_.RemoveSocketInfo(pSI);\r
353                         return true;\r
354                 }\r
355 \r
356                 //Convert to widechar\r
357                 if(ConvertMultiByteToWideChar(pProtocolStrategy_->GetCodepage(), pSI->recvBuffer_, recvResult + pSI->recvLeftoverOffset_, pSI->wideRecvBuffer_, pSI->recvLeftoverOffset_))              \r
358                         pProtocolStrategy_->Parse(&pSI->wideRecvBuffer_[0], pSI->wideRecvBuffer_.size(), pSI);          \r
359                 else                    \r
360                         CASPAR_LOG(error) << "Read from " << pSI->host_.c_str() << TEXT(" failed, could not convert command to UNICODE");\r
361                         \r
362                 maxRecvLength = sizeof(pSI->recvBuffer_)-pSI->recvLeftoverOffset_;\r
363                 recvResult = recv(pSI->socket_, pSI->recvBuffer_+pSI->recvLeftoverOffset_, maxRecvLength, 0);\r
364         }\r
365 \r
366         if(recvResult == SOCKET_ERROR) {\r
367                 int errorCode = WSAGetLastError();\r
368                 if(errorCode == WSAEWOULDBLOCK)\r
369                         return true;\r
370                 else {\r
371                         LogSocketError(TEXT("Read"), errorCode);\r
372                         OnError(pSI->event_, errorCode);\r
373                 }\r
374         }\r
375 \r
376         return false;\r
377 }\r
378 \r
379 //////////////////////////////\r
380 // AsyncEventServer::OnWrite\r
381 // PARAMS: ...\r
382 // COMMENT: Called when the socket is ready to send more data\r
383 void AsyncEventServer::OnWrite(SocketInfoPtr& pSI) {\r
384         DoSend(*pSI);   \r
385 }\r
386 \r
387 bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)\r
388 {\r
389         int bytesWritten = 0;\r
390         int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);\r
391         if(multibyteBufferCapacity > 0) \r
392         {\r
393                 destBuffer.resize(multibyteBufferCapacity);\r
394                 bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), &destBuffer[0], destBuffer.size(), NULL, NULL);\r
395         }\r
396         destBuffer.resize(bytesWritten);\r
397         return (bytesWritten > 0);\r
398 }\r
399 \r
400 void AsyncEventServer::DoSend(SocketInfo& socketInfo) {\r
401         //Locks the socketInfo-object so that no one else tampers with the sendqueue at the same time\r
402         tbb::mutex::scoped_lock lock(mutex_);\r
403 \r
404         while(!socketInfo.sendQueue_.empty() || socketInfo.currentlySending_.size() > 0) {\r
405                 if(socketInfo.currentlySending_.size() == 0) {\r
406                         //Read the next string in the queue and convert to UTF-8\r
407                         if(!ConvertWideCharToMultiByte(pProtocolStrategy_->GetCodepage(), socketInfo.sendQueue_.front(), socketInfo.currentlySending_))\r
408                         {\r
409                                 CASPAR_LOG(error) << "Send to " << socketInfo.host_.c_str() << TEXT(" failed, could not convert response to UTF-8");\r
410                         }\r
411                         socketInfo.currentlySendingOffset_ = 0;\r
412                 }\r
413 \r
414                 if(socketInfo.currentlySending_.size() > 0) {\r
415                         int bytesToSend = static_cast<int>(socketInfo.currentlySending_.size()-socketInfo.currentlySendingOffset_);\r
416                         int sentBytes = send(socketInfo.socket_, &socketInfo.currentlySending_[0] + socketInfo.currentlySendingOffset_, bytesToSend, 0);\r
417                         if(sentBytes == SOCKET_ERROR) {\r
418                                 int errorCode = WSAGetLastError();\r
419                                 if(errorCode == WSAEWOULDBLOCK) {\r
420                                         CASPAR_LOG(debug) << "Send to " << socketInfo.host_.c_str() << TEXT(" would block, sending later");\r
421                                         break;\r
422                                 }\r
423                                 else {\r
424                                         LogSocketError(TEXT("Send"), errorCode);\r
425                                         OnError(socketInfo.event_, errorCode);\r
426 \r
427                                         socketInfo.currentlySending_.resize(0);\r
428                                         socketInfo.currentlySendingOffset_ = 0;\r
429                                         socketInfo.sendQueue_.pop();\r
430                                         break;\r
431                                 }\r
432                         }\r
433                         else {\r
434                                 if(sentBytes == bytesToSend) {\r
435                                         \r
436                                         if(sentBytes < 512)\r
437                                         {\r
438                                                 boost::replace_all(socketInfo.sendQueue_.front(), L"\n", L"\\n");\r
439                                                 boost::replace_all(socketInfo.sendQueue_.front(), L"\r", L"\\r");\r
440                                                 CASPAR_LOG(info) << L"Sent message to " << socketInfo.host_.c_str() << L": " << socketInfo.sendQueue_.front().c_str();\r
441                                         }\r
442                                         else\r
443                                                 CASPAR_LOG(info) << "Sent more than 512 bytes to " << socketInfo.host_.c_str();\r
444 \r
445                                         socketInfo.currentlySending_.resize(0);\r
446                                         socketInfo.currentlySendingOffset_ = 0;\r
447                                         socketInfo.sendQueue_.pop();\r
448                                 }\r
449                                 else {\r
450                                         socketInfo.currentlySendingOffset_ += sentBytes;\r
451                                         CASPAR_LOG(info) << "Sent partial message to " << socketInfo.host_.c_str();\r
452                                 }\r
453                         }\r
454                 }\r
455                 else\r
456                         socketInfo.sendQueue_.pop();\r
457         }\r
458 }\r
459 \r
460 //////////////////////////////\r
461 // AsyncEventServer::OnClose\r
462 // PARAMS: ...\r
463 // COMMENT: Called when a client disconnects / is disconnected\r
464 void AsyncEventServer::OnClose(SocketInfoPtr& pSI) {\r
465         CASPAR_LOG(info) << "Client " << pSI->host_.c_str() << TEXT(" was disconnected");\r
466 \r
467         socketInfoCollection_.RemoveSocketInfo(pSI);\r
468 }\r
469 \r
470 //////////////////////////////\r
471 // AsyncEventServer::OnError\r
472 // PARAMS: ...\r
473 // COMMENT: Called when an errorcode is recieved\r
474 void AsyncEventServer::OnError(HANDLE waitEvent, int errorCode) {\r
475         if(errorCode == WSAENETDOWN || errorCode == WSAECONNABORTED || errorCode == WSAECONNRESET || errorCode == WSAESHUTDOWN || errorCode == WSAETIMEDOUT || errorCode == WSAENOTCONN || errorCode == WSAENETRESET) {\r
476                 SocketInfoPtr pSocketInfo;\r
477                 if(socketInfoCollection_.FindSocketInfo(waitEvent, pSocketInfo)) {\r
478                         CASPAR_LOG(info) << "Client " << pSocketInfo->host_.c_str() << TEXT(" was disconnected, Errorcode ") << errorCode;\r
479                 }\r
480 \r
481                 socketInfoCollection_.RemoveSocketInfo(waitEvent);\r
482         }\r
483 }\r
484 \r
485 //////////////////////////////\r
486 // AsyncEventServer::DisconnectClient\r
487 // PARAMS: ...\r
488 // COMMENT: The client is removed from the actual client-list when an FD_CLOSE notification is recieved\r
489 void AsyncEventServer::DisconnectClient(SocketInfo& socketInfo) {\r
490         int result = shutdown(socketInfo.socket_, SD_SEND);\r
491         if(result == SOCKET_ERROR)\r
492                 OnError(socketInfo.event_, result);\r
493 }\r
494 \r
495 //////////////////////////////\r
496 // AsyncEventServer::LogSocketError\r
497 void AsyncEventServer::LogSocketError(const TCHAR* pStr, int socketError) {\r
498         if(socketError == 0)\r
499                 socketError = WSAGetLastError();\r
500 \r
501         CASPAR_LOG(error) << "Failed to " << pStr << TEXT(" Errorcode: ") << socketError;\r
502 }\r
503 \r
504 \r
505 //////////////////////////////\r
506 //  SocketInfoCollection\r
507 //////////////////////////////\r
508 \r
509 AsyncEventServer::SocketInfoCollection::SocketInfoCollection() : bDirty_(false) {\r
510 }\r
511 \r
512 AsyncEventServer::SocketInfoCollection::~SocketInfoCollection() {\r
513 }\r
514 \r
515 bool AsyncEventServer::SocketInfoCollection::AddSocketInfo(SocketInfoPtr& pSocketInfo) {\r
516         tbb::mutex::scoped_lock lock(mutex_);\r
517 \r
518         waitEvents_.resize(waitEvents_.size()+1);\r
519         bool bSuccess = socketInfoMap_.insert(SocketInfoMap::value_type(pSocketInfo->event_, pSocketInfo)).second;\r
520         if(bSuccess) {\r
521                 waitEvents_[waitEvents_.size()-1] = pSocketInfo->event_;\r
522                 bDirty_ = true;\r
523         }\r
524 \r
525         return bSuccess;\r
526 }\r
527 \r
528 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(SocketInfoPtr& pSocketInfo) {\r
529         if(pSocketInfo != 0) {\r
530                 RemoveSocketInfo(pSocketInfo->event_);\r
531         }\r
532 }\r
533 void AsyncEventServer::SocketInfoCollection::RemoveSocketInfo(HANDLE waitEvent) {\r
534         tbb::mutex::scoped_lock lock(mutex_);\r
535 \r
536         //Find instance\r
537         SocketInfoPtr pSocketInfo;\r
538         SocketInfoMap::iterator it = socketInfoMap_.find(waitEvent);\r
539         SocketInfoMap::iterator end = socketInfoMap_.end();\r
540         if(it != end)\r
541                 pSocketInfo = it->second;\r
542 \r
543         if(pSocketInfo) {\r
544                 pSocketInfo->pServer_ = NULL;\r
545 \r
546                 socketInfoMap_.erase(waitEvent);\r
547 \r
548                 HandleVector::iterator it = std::find(waitEvents_.begin(), waitEvents_.end(), waitEvent);\r
549                 if(it != waitEvents_.end()) {\r
550                         std::swap((*it), waitEvents_.back());\r
551                         waitEvents_.resize(waitEvents_.size()-1);\r
552 \r
553                         bDirty_ = true;\r
554                 }\r
555         }\r
556         if(onSocketInfoRemoved)\r
557                 onSocketInfoRemoved(pSocketInfo);\r
558 }\r
559 \r
560 bool AsyncEventServer::SocketInfoCollection::FindSocketInfo(HANDLE key, SocketInfoPtr& pResult) {\r
561         tbb::mutex::scoped_lock lock(mutex_);\r
562 \r
563         SocketInfoMap::iterator it = socketInfoMap_.find(key);\r
564         SocketInfoMap::iterator end = socketInfoMap_.end();\r
565         if(it != end)\r
566                 pResult = it->second;\r
567 \r
568         return (it != end);\r
569 }\r
570 \r
571 void AsyncEventServer::SocketInfoCollection::CopyCollectionToArray(HANDLE* pDest, int maxCount) {\r
572         tbb::mutex::scoped_lock lock(mutex_);\r
573 \r
574         memcpy(pDest, &(waitEvents_[0]), std::min( maxCount, static_cast<int>(waitEvents_.size()) ) * sizeof(HANDLE) );\r
575 }\r
576 \r
577 void AsyncEventServer::SocketInfoCollection::Clear() {\r
578         tbb::mutex::scoped_lock lock(mutex_);\r
579 \r
580         socketInfoMap_.clear();\r
581         waitEvents_.clear();\r
582 }\r
583 \r
584 }       //namespace IO\r
585 }       //namespace caspar