2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
\r
4 * This file is part of CasparCG (www.casparcg.com).
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
19 * Author: Robert Nagy, ronag89@gmail.com
\r
22 #include "..\stdafx.h"
\r
24 #include "AsyncEventServer.h"
\r
26 #include "ProtocolStrategy.h"
\r
28 #include <algorithm>
\r
34 #include <boost/asio.hpp>
\r
35 #include <boost/thread.hpp>
\r
36 #include <boost/lexical_cast.hpp>
\r
37 #include <boost/locale.hpp>
\r
38 #include <boost/algorithm/string/split.hpp>
\r
40 using boost::asio::ip::tcp;
\r
42 namespace caspar { namespace IO {
\r
46 typedef std::set<spl::shared_ptr<connection>> connection_set;
\r
48 class connection : public spl::enable_shared_from_this<connection>, public ClientInfo
\r
50 spl::shared_ptr<tcp::socket> socket_;
\r
51 const std::wstring name_;
\r
53 std::array<char, 32768> data_;
\r
55 std::string buffer_;
\r
57 const spl::shared_ptr<IProtocolStrategy> protocol_;
\r
58 spl::shared_ptr<connection_set> connection_set_;
\r
61 static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr<connection_set> connection_set)
\r
63 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
\r
68 std::wstring print() const
\r
70 return L"[" + name_ + L"]";
\r
75 virtual void Send(const std::wstring& data)
\r
80 virtual void Disconnect()
\r
89 connection_set_->erase(shared_from_this());
\r
91 CASPAR_LOG(info) << print() << L" Disconnected.";
\r
95 connection(const spl::shared_ptr<tcp::socket>& socket, const ProtocolStrategyPtr& protocol, const spl::shared_ptr<connection_set>& connection_set)
\r
97 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
\r
98 , protocol_(protocol)
\r
99 , connection_set_(connection_set)
\r
101 CASPAR_LOG(info) << print() << L" Connected.";
\r
104 void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
\r
110 CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred));
\r
112 buffer_.insert(buffer_.end(), data_.begin(), data_.begin() + bytes_transferred);
\r
114 std::vector<std::string> split;
\r
115 boost::iter_split(split, buffer_, boost::algorithm::first_finder("\r\n"));
\r
117 buffer_ = std::move(split.back());
\r
120 BOOST_FOREACH(auto cmd, split)
\r
122 auto u16cmd = boost::locale::conv::to_utf<wchar_t>(cmd, protocol_->GetCodepage()) + L"\r\n";
\r
123 protocol_->Parse(u16cmd.data(), static_cast<int>(u16cmd.size()), shared_from_this());
\r
128 CASPAR_LOG_CURRENT_EXCEPTION();
\r
133 else if (error != boost::asio::error::operation_aborted)
\r
139 void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
\r
142 CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(std::string(data->begin(), data->end())) : L"more than 512 bytes.");
\r
143 else if (error != boost::asio::error::operation_aborted)
\r
149 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
\r
152 void write_some(const std::wstring& data)
\r
154 auto str = spl::make_shared<std::string>(boost::locale::conv::from_utf<wchar_t>(data, protocol_->GetCodepage()));
\r
155 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
\r
159 struct AsyncEventServer::implementation
\r
161 boost::asio::io_service service_;
\r
162 tcp::acceptor acceptor_;
\r
163 spl::shared_ptr<IProtocolStrategy> protocol_;
\r
164 spl::shared_ptr<connection_set> connection_set_;
\r
165 boost::thread thread_;
\r
167 implementation(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port)
\r
168 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
\r
169 , protocol_(protocol)
\r
170 , thread_(std::bind(&boost::asio::io_service::run, &service_))
\r
183 auto connections = *connection_set_;
\r
184 BOOST_FOREACH(auto& connection, connections)
\r
188 connection->stop();
\r
192 CASPAR_LOG_CURRENT_EXCEPTION();
\r
199 CASPAR_LOG_CURRENT_EXCEPTION();
\r
205 void start_accept()
\r
207 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
\r
208 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
\r
211 void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
\r
213 if (!acceptor_.is_open())
\r
217 connection_set_->insert(connection::create(socket, protocol_, connection_set_));
\r
223 AsyncEventServer::AsyncEventServer(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port) : impl_(new implementation(protocol, port)){}
\r
224 AsyncEventServer::~AsyncEventServer(){}
\r