2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
22 #include "..\stdafx.h"
24 #include "AsyncEventServer.h"
32 #include <boost/asio.hpp>
33 #include <boost/thread.hpp>
34 #include <boost/lexical_cast.hpp>
36 #include <tbb\mutex.h>
38 using boost::asio::ip::tcp;
40 namespace caspar { namespace IO {
44 typedef std::set<spl::shared_ptr<connection>> connection_set;
46 class connection : public spl::enable_shared_from_this<connection>
48 const spl::shared_ptr<tcp::socket> socket_;
49 const spl::shared_ptr<connection_set> connection_set_;
50 const std::wstring name_;
51 protocol_strategy_factory<char>::ptr protocol_factory_;
52 std::shared_ptr<protocol_strategy<char>> protocol_;
54 std::array<char, 32768> data_;
55 std::vector<std::shared_ptr<void>> lifecycle_bound_items_;
57 class connection_holder : public client_connection<char>
59 std::weak_ptr<connection> connection_;
61 explicit connection_holder(std::weak_ptr<connection> conn) : connection_(conn)
64 virtual void send(std::basic_string<char>&& data)
66 auto conn = connection_.lock();
67 conn->send(std::move(data));
69 virtual void disconnect()
71 auto conn = connection_.lock();
74 virtual std::wstring print() const
76 auto conn = connection_.lock();
80 virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
82 auto conn = connection_.lock();
83 return conn->bind_to_lifecycle(lifecycle_bound);
88 static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
90 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
97 CASPAR_LOG(info) << print() << L" connection destroyed.";
100 std::wstring print() const
102 return L"[" + name_ + L"]";
105 const std::string ipv4_address() const
107 return socket_->is_open() ? socket_->local_endpoint().address().to_string() : "no-address";
112 virtual void send(std::string&& data)
114 write_some(std::move(data));
117 virtual void disconnect()
121 void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
123 lifecycle_bound_items_.push_back(lifecycle_bound);
130 connection_set_->erase(shared_from_this());
137 CASPAR_LOG_CURRENT_EXCEPTION();
140 CASPAR_LOG(info) << print() << L" Disconnected.";
144 connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
146 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
147 , connection_set_(connection_set)
148 , protocol_factory_(protocol_factory)
150 CASPAR_LOG(info) << print() << L" Connected.";
153 protocol_strategy<char>& protocol()
156 protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
161 void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
167 std::string data(data_.begin(), data_.begin() + bytes_transferred);
169 CASPAR_LOG(trace) << print() << L" Received: " << u16(data);
171 protocol().parse(data);
175 CASPAR_LOG_CURRENT_EXCEPTION();
180 else if (error != boost::asio::error::operation_aborted)
184 void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
187 CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");
188 else if (error != boost::asio::error::operation_aborted)
194 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
197 void write_some(std::string&& data)
199 auto str = spl::make_shared<std::string>(std::move(data));
200 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
204 struct AsyncEventServer::implementation
206 boost::asio::io_service service_;
207 tcp::acceptor acceptor_;
208 protocol_strategy_factory<char>::ptr protocol_factory_;
209 spl::shared_ptr<connection_set> connection_set_;
210 boost::thread thread_;
211 std::vector<lifecycle_factory_t> lifecycle_factories_;
214 implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
215 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
216 , protocol_factory_(protocol)
217 , thread_(std::bind(&boost::asio::io_service::run, &service_))
230 CASPAR_LOG_CURRENT_EXCEPTION();
235 auto connections = *connection_set_;
236 BOOST_FOREACH(auto& connection, connections)
245 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
246 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
249 void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
251 if (!acceptor_.is_open())
256 auto conn = connection::create(socket, protocol_factory_, connection_set_);
257 connection_set_->insert(conn);
260 tbb::mutex::scoped_lock lock(mutex_);
262 BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)
264 auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
265 conn->bind_to_lifecycle(lifecycle_bound);
272 void add_client_lifecycle_event_factory(const lifecycle_factory_t& factory)
274 tbb::mutex::scoped_lock lock(mutex_);
275 lifecycle_factories_.push_back(factory);
279 AsyncEventServer::AsyncEventServer(
280 const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
281 : impl_(new implementation(protocol, port)) {}
283 AsyncEventServer::~AsyncEventServer() {}
284 void AsyncEventServer::add_client_lifecycle_event_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_event_factory(factory); }