2 * copyright (c) 2010 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG.
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
21 // AsyncEventServer.cpp: implementation of the AsyncEventServer class.
\r
23 //////////////////////////////////////////////////////////////////////
\r
25 #include "..\stdafx.h"
\r
27 #include "AsyncEventServer.h"
\r
29 #include "ProtocolStrategy.h"
\r
31 #include <algorithm>
\r
37 #include <boost/asio.hpp>
\r
38 #include <boost/thread.hpp>
\r
39 #include <boost/lexical_cast.hpp>
\r
41 using boost::asio::ip::tcp;
\r
43 namespace caspar { namespace IO {
\r
45 bool ConvertWideCharToMultiByte(UINT codePage, const std::wstring& wideString, std::vector<char>& destBuffer)
\r
47 int bytesWritten = 0;
\r
48 int multibyteBufferCapacity = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), 0, 0, NULL, NULL);
\r
49 if(multibyteBufferCapacity > 0)
\r
51 destBuffer.resize(multibyteBufferCapacity);
\r
52 bytesWritten = WideCharToMultiByte(codePage, 0, wideString.c_str(), static_cast<int>(wideString.length()), destBuffer.data(), static_cast<int>(destBuffer.size()), NULL, NULL);
\r
54 destBuffer.resize(bytesWritten);
\r
55 return (bytesWritten > 0);
\r
58 bool ConvertMultiByteToWideChar(UINT codePage, char* pSource, int sourceLength, std::vector<wchar_t>& wideBuffer, int& countLeftovers)
\r
60 if(codePage == CP_UTF8) {
\r
62 //check from the end of pSource for ev. uncompleted UTF-8 byte sequence
\r
63 if(pSource[sourceLength-1] & 0x80) {
\r
64 //The last byte is part of a multibyte sequence. If the sequence is not complete, we need to save the partial sequence
\r
65 int bytesToCheck = std::min<int>(4, sourceLength); //a sequence contains a maximum of 4 bytes
\r
66 int currentLeftoverIndex = sourceLength-1;
\r
67 for(; bytesToCheck > 0; --bytesToCheck, --currentLeftoverIndex) {
\r
69 if(pSource[currentLeftoverIndex] & 0x80) {
\r
70 if(pSource[currentLeftoverIndex] & 0x40) { //The two high-bits are set, this is the "header"
\r
71 int expectedSequenceLength = 2;
\r
72 if(pSource[currentLeftoverIndex] & 0x20)
\r
73 ++expectedSequenceLength;
\r
74 if(pSource[currentLeftoverIndex] & 0x10)
\r
75 ++expectedSequenceLength;
\r
77 if(countLeftovers < expectedSequenceLength) {
\r
78 //The sequence is incomplete. Leave the leftovers to be interpreted with the next call
\r
81 //The sequence is complete, there are no leftovers.
\r
83 //error. Let the conversion-function take the hit.
\r
89 //error. Let the conversion-function take the hit.
\r
94 if(countLeftovers == 4) {
\r
95 //error. Let the conversion-function take the hit.
\r
101 int charsWritten = 0;
\r
102 int sourceBytesToProcess = sourceLength-countLeftovers;
\r
103 int wideBufferCapacity = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, NULL, NULL);
\r
104 if(wideBufferCapacity > 0)
\r
106 wideBuffer.resize(wideBufferCapacity);
\r
107 charsWritten = MultiByteToWideChar(codePage, 0, pSource, sourceBytesToProcess, wideBuffer.data(), static_cast<int>(wideBuffer.size()));
\r
109 //copy the leftovers to the front of the buffer
\r
110 if(countLeftovers > 0) {
\r
111 memcpy(pSource, &(pSource[sourceBytesToProcess]), countLeftovers);
\r
114 wideBuffer.resize(charsWritten);
\r
115 return (charsWritten > 0);
\r
120 typedef std::set<spl::shared_ptr<connection>> connection_set;
\r
122 class connection : public spl::enable_shared_from_this<connection>, public ClientInfo
\r
124 spl::shared_ptr<tcp::socket> socket_;
\r
125 const std::wstring name_;
\r
127 std::array<char, 8192> data_;
\r
129 std::vector<char> buffer_;
\r
131 const spl::shared_ptr<IProtocolStrategy> protocol_;
\r
132 spl::shared_ptr<connection_set> connection_set_;
\r
135 static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr<connection_set> connection_set)
\r
137 spl::shared_ptr<connection> con(new connection(socket, protocol, connection_set));
\r
142 std::wstring print() const
\r
144 return L"[" + name_ + L"]";
\r
149 virtual void Send(const std::wstring& data)
\r
154 virtual void Disconnect()
\r
156 connection_set_->erase(shared_from_this());
\r
163 socket_->shutdown(boost::asio::socket_base::shutdown_both);
\r
165 CASPAR_LOG(info) << print() << L" Disconnected.";
\r
169 connection(const spl::shared_ptr<tcp::socket>& socket, const ProtocolStrategyPtr& protocol, const spl::shared_ptr<connection_set>& connection_set)
\r
171 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
\r
172 , protocol_(protocol)
\r
173 , connection_set_(connection_set)
\r
175 CASPAR_LOG(info) << print() << L" Connected.";
\r
178 void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
\r
184 CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred));
\r
186 buffer_.insert(buffer_.end(), data_.begin(), data_.begin() + bytes_transferred);
\r
188 std::vector<wchar_t> str;
\r
190 ConvertMultiByteToWideChar(protocol_->GetCodepage(), buffer_.data(), static_cast<int>(buffer_.size()), str, left_overs);
\r
191 buffer_.resize(left_overs);
\r
193 protocol_->Parse(str.data(), static_cast<int>(str.size()), shared_from_this());
\r
197 CASPAR_LOG_CURRENT_EXCEPTION();
\r
202 else if (error != boost::asio::error::operation_aborted)
\r
204 connection_set_->erase(shared_from_this());
\r
211 void handle_write(const spl::shared_ptr<std::vector<char>>& data, const boost::system::error_code& error, size_t bytes_transferred)
\r
215 CASPAR_LOG(trace) << print() << L" Sent: " << u16(std::string(data->begin(), data->end()));
\r
217 else if (error != boost::asio::error::operation_aborted)
\r
219 connection_set_->erase(shared_from_this());
\r
226 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
\r
229 void write_some(const std::wstring& data)
\r
231 spl::shared_ptr<std::vector<char>> str;
\r
232 ConvertWideCharToMultiByte(protocol_->GetCodepage(), data, *str);
\r
234 socket_->async_write_some(boost::asio::buffer(*str), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
\r
238 struct AsyncEventServer::implementation
\r
240 boost::asio::io_service service_;
\r
241 tcp::acceptor acceptor_;
\r
242 spl::shared_ptr<IProtocolStrategy> protocol_;
\r
243 spl::shared_ptr<connection_set> connection_set_;
\r
244 boost::thread thread_;
\r
246 implementation(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port)
\r
247 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
\r
248 , protocol_(protocol)
\r
249 , thread_(std::bind(&boost::asio::io_service::run, &service_))
\r
260 BOOST_FOREACH(auto& connection, *connection_set_)
\r
261 connection->stop();
\r
267 void start_accept()
\r
269 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
\r
270 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
\r
273 void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
\r
275 if (!acceptor_.is_open())
\r
279 connection_set_->insert(connection::create(socket, protocol_, connection_set_));
\r
285 AsyncEventServer::AsyncEventServer(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port) : impl_(new implementation(protocol, port)){}
\r
286 AsyncEventServer::~AsyncEventServer(){}
\r