]> git.sesse.net Git - casparcg/blobdiff - protocol/util/AsyncEventServer.cpp
2.1.0: protocol: Refactored. Uses boost::locale.
[casparcg] / protocol / util / AsyncEventServer.cpp
index 7eee0639427efce43cc005ba0e26baed86a85d95..ee2214b8bf79a5310519e26c535d56c53367ee1d 100644 (file)
@@ -1,26 +1,23 @@
 /*\r
-* copyright (c) 2010 Sveriges Television AB <info@casparcg.com>\r
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
 *\r
-*  This file is part of CasparCG.\r
+* This file is part of CasparCG (www.casparcg.com).\r
 *\r
-*    CasparCG is free software: you can redistribute it and/or modify\r
-*    it under the terms of the GNU General Public License as published by\r
-*    the Free Software Foundation, either version 3 of the License, or\r
-*    (at your option) any later version.\r
+* CasparCG is free software: you can redistribute it and/or modify\r
+* it under the terms of the GNU General Public License as published by\r
+* the Free Software Foundation, either version 3 of the License, or\r
+* (at your option) any later version.\r
 *\r
-*    CasparCG is distributed in the hope that it will be useful,\r
-*    but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
-*    GNU General Public License for more details.\r
-\r
-*    You should have received a copy of the GNU General Public License\r
-*    along with CasparCG.  If not, see <http://www.gnu.org/licenses/>.\r
+* CasparCG is distributed in the hope that it will be useful,\r
+* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
+* GNU General Public License for more details.\r
+*\r
+* You should have received a copy of the GNU General Public License\r
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
 *\r
+* Author: Robert Nagy, ronag89@gmail.com\r
 */\r
\r
-// AsyncEventServer.cpp: implementation of the AsyncEventServer class.\r
-//\r
-//////////////////////////////////////////////////////////////////////\r
 \r
 #include "..\stdafx.h"\r
 \r
 #include <boost/asio.hpp>\r
 #include <boost/thread.hpp>\r
 #include <boost/lexical_cast.hpp>\r
+#include <boost/locale.hpp>\r
+#include <boost/algorithm/string/split.hpp>\r
 \r
 using boost::asio::ip::tcp;\r
 \r
 namespace caspar { namespace IO {\r
        \r
-bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)\r
-{\r
-       int bytesWritten = 0;\r
-       int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);\r
-       if(multibyteBufferCapacity > 0) \r
-       {\r
-               destBuffer.resize(multibyteBufferCapacity);\r
-               bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), destBuffer.data(), static_cast<int>(destBuffer.size()), NULL, NULL);\r
-       }\r
-       destBuffer.resize(bytesWritten);\r
-       return (bytesWritten > 0);\r
-}\r
-\r
-bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)\r
-{\r
-       if(codePage == CP_UTF8) {\r
-               countLeftovers = 0;\r
-               //check from the end of pSource for ev. uncompleted UTF-8 byte sequence\r
-               if(pSource[sourceLength-1] & 0x80) {\r
-                       //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence\r
-                       int bytesToCheck = std::min<int>(4, sourceLength);      //a sequence contains a maximum of 4 bytes\r
-                       int currentLeftoverIndex = sourceLength-1;\r
-                       for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {\r
-                               ++countLeftovers;\r
-                               if(pSource[currentLeftoverIndex] & 0x80) {\r
-                                       if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"\r
-                                               int expectedSequenceLength = 2;\r
-                                               if(pSource[currentLeftoverIndex] & 0x20)\r
-                                                       ++expectedSequenceLength;\r
-                                               if(pSource[currentLeftoverIndex] & 0x10)\r
-                                                       ++expectedSequenceLength;\r
-\r
-                                               if(countLeftovers < expectedSequenceLength) {\r
-                                                       //The sequence is incomplete. Leave the leftovers to be interpreted with the next call\r
-                                                       break;\r
-                                               }\r
-                                               //The sequence is complete, there are no leftovers. \r
-                                               //...OR...\r
-                                               //error. Let the conversion-function take the hit.\r
-                                               countLeftovers = 0;\r
-                                               break;\r
-                                       }\r
-                               }\r
-                               else {\r
-                                       //error. Let the conversion-function take the hit.\r
-                                       countLeftovers = 0;\r
-                                       break;\r
-                               }\r
-                       }\r
-                       if(countLeftovers == 4) {\r
-                               //error. Let the conversion-function take the hit.\r
-                               countLeftovers = 0;\r
-                       }\r
-               }\r
-       }\r
-\r
-       int charsWritten = 0;\r
-       int sourceBytesToProcess = sourceLength-countLeftovers;\r
-       int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);\r
-       if(wideBufferCapacity > 0) \r
-       {\r
-               wideBuffer.resize(wideBufferCapacity);\r
-               charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, wideBuffer.data(), static_cast<int>(wideBuffer.size()));\r
-       }\r
-       //copy the leftovers to the front of the buffer\r
-       if(countLeftovers > 0) {\r
-               memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);\r
-       }\r
-\r
-       wideBuffer.resize(charsWritten);\r
-       return (charsWritten > 0);\r
-}\r
-\r
 class connection;\r
 \r
 typedef std::set<spl::shared_ptr<connection>> connection_set;\r
 \r
 class connection : public spl::enable_shared_from_this<connection>, public ClientInfo\r
 {    \r
-    spl::shared_ptr<tcp::socket>               socket_; \r
-       const std::wstring                                      name_;\r
+    spl::shared_ptr<tcp::socket>                               socket_; \r
+       const std::wstring                                                      name_;\r
 \r
-       std::array<char, 8192>                          data_;\r
+       std::array<char, 32768>                                         data_;\r
 \r
-       std::vector<char>                                       buffer_;\r
+       std::string                                                                     buffer_;\r
 \r
        const spl::shared_ptr<IProtocolStrategy>        protocol_;\r
        spl::shared_ptr<connection_set>                         connection_set_;\r
@@ -134,7 +60,7 @@ class connection : public spl::enable_shared_from_this<connection>, public Clien
 public:\r
     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr<connection_set> connection_set)\r
        {\r
-               spl::shared_ptr<connection> con(new connection(socket, protocol, connection_set));\r
+               spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));\r
                con->read_some();\r
                return con;\r
     }\r
@@ -161,7 +87,6 @@ public:
        void stop()\r
        {\r
                connection_set_->erase(shared_from_this());\r
-               socket_->shutdown(boost::asio::socket_base::shutdown_both);\r
                socket_->close();\r
                CASPAR_LOG(info) << print() << L" Disconnected.";\r
        }\r
@@ -185,13 +110,18 @@ private:
                                CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred));\r
 \r
                                buffer_.insert(buffer_.end(), data_.begin(), data_.begin() + bytes_transferred);\r
-               \r
-                               std::vector<wchar_t> str;\r
-                               int left_overs;\r
-                               ConvertMultiByteToWideChar(protocol_->GetCodepage(), buffer_.data(), static_cast<int>(buffer_.size()), str, left_overs);\r
-                               buffer_.resize(left_overs);\r
-               \r
-                               protocol_->Parse(str.data(), static_cast<int>(str.size()), shared_from_this());\r
+                               \r
+                               std::vector<std::string> split;\r
+                               boost::iter_split(split, buffer_, boost::algorithm::first_finder("\r\n"));\r
+                               \r
+                               buffer_ = std::move(split.back());\r
+                               split.pop_back();\r
+\r
+                               BOOST_FOREACH(auto cmd, split)\r
+                               {\r
+                                       auto u16cmd = boost::locale::conv::to_utf<wchar_t>(cmd, protocol_->GetCodepage()) + L"\r\n";\r
+                                       protocol_->Parse(u16cmd.data(), static_cast<int>(u16cmd.size()), shared_from_this());\r
+                               }\r
                        }\r
                        catch(...)\r
                        {\r
@@ -206,14 +136,11 @@ private:
                        read_some();\r
     }\r
 \r
-    void handle_write(const spl::shared_ptr<std::vector<char>>& data, const boost::system::error_code& error, size_t bytes_transferred)\r
+    void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)\r
        {\r
                if(!error)                      \r
                {\r
-                       if(data->size() < 512)\r
-                               CASPAR_LOG(trace) << print() << L" Sent: " << u16(std::string(data->begin(), data->end()));\r
-                       else\r
-                               CASPAR_LOG(trace) << print() << L" Sent more than 512 bytes.";\r
+                       CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(std::string(data->begin(), data->end())) : L"more than 512 bytes.");\r
                }\r
                else if (error != boost::asio::error::operation_aborted)                \r
                        stop();         \r
@@ -226,10 +153,8 @@ private:
        \r
        void write_some(const std::wstring& data)\r
        {\r
-               spl::shared_ptr<std::vector<char>> str;\r
-               ConvertWideCharToMultiByte(protocol_->GetCodepage(), data, *str);\r
-\r
-               socket_->async_write_some(boost::asio::buffer(*str), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));\r
+               auto str = spl::make_shared<std::string>(boost::locale::conv::from_utf<wchar_t>(data, protocol_->GetCodepage()));\r
+               socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));\r
        }\r
 };\r
 \r
@@ -251,13 +176,30 @@ struct AsyncEventServer::implementation
 \r
        ~implementation()\r
        {\r
-               acceptor_.close();\r
+               try\r
+               {\r
+                       acceptor_.close();\r
 \r
-               service_.post([=]\r
+                       service_.post([=]\r
+                       {\r
+                               auto connections = *connection_set_;\r
+                               BOOST_FOREACH(auto& connection, connections)\r
+                               {\r
+                                       try\r
+                                       {\r
+                                               connection->stop();\r
+                                       }\r
+                                       catch(...)\r
+                                       {\r
+                                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                       }\r
+                               }\r
+                       });\r
+               }\r
+               catch(...)\r
                {\r
-                       BOOST_FOREACH(auto& connection, *connection_set_)\r
-                               connection->stop();\r
-               });\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+               }\r
 \r
                thread_.join();\r
        }\r