X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=protocol%2Futil%2FAsyncEventServer.cpp;h=cdfe538345206bbf36b72a7f881c66b269bebd9c;hb=7f606328a16a14e6e1190a440caa2cb2e619127b;hp=386826d83561926ce1ecf11031b709c60b9c2388;hpb=33e70c3ae0ca930c637f2b961df60f38a1ce187c;p=casparcg diff --git a/protocol/util/AsyncEventServer.cpp b/protocol/util/AsyncEventServer.cpp index 386826d83..cdfe53834 100644 --- a/protocol/util/AsyncEventServer.cpp +++ b/protocol/util/AsyncEventServer.cpp @@ -31,7 +31,6 @@ #include #include -#include #include #include @@ -41,20 +40,20 @@ using boost::asio::ip::tcp; namespace caspar { namespace IO { - + class connection; typedef std::set> connection_set; class connection : public spl::enable_shared_from_this -{ +{ typedef tbb::concurrent_hash_map> lifecycle_map_type; typedef tbb::concurrent_queue send_queue; - const spl::shared_ptr socket_; - boost::asio::io_service& service_; + const spl::shared_ptr socket_; + std::shared_ptr service_; + const std::wstring listen_port_; const spl::shared_ptr connection_set_; - const std::wstring name_; protocol_strategy_factory::ptr protocol_factory_; std::shared_ptr> protocol_; @@ -86,22 +85,12 @@ class connection : public spl::enable_shared_from_this conn->disconnect(); } - std::wstring print() const override - { - auto conn = connection_.lock(); - - if (conn) - return conn->print(); - else - return L"[destroyed-connection]"; - } - std::wstring address() const override { auto conn = connection_.lock(); if (conn) - return conn->address(); + return conn->ipv4_address(); else return L"[destroyed-connection]"; } @@ -126,37 +115,50 @@ class connection : public spl::enable_shared_from_this }; public: - static spl::shared_ptr create(spl::shared_ptr socket, const protocol_strategy_factory::ptr& protocol, spl::shared_ptr connection_set) + static spl::shared_ptr create(std::shared_ptr service, spl::shared_ptr socket, const protocol_strategy_factory::ptr& protocol, spl::shared_ptr connection_set) { - spl::shared_ptr con(new connection(std::move(socket), std::move(protocol), std::move(connection_set))); + spl::shared_ptr con(new connection(std::move(service), std::move(socket), std::move(protocol), std::move(connection_set))); + con->init(); con->read_some(); return con; } + void init() + { + protocol_ = protocol_factory_->create(spl::make_shared(shared_from_this())); + } + ~connection() { - CASPAR_LOG(info) << print() << L" connection destroyed."; + CASPAR_LOG(debug) << print() << L" connection destroyed."; } std::wstring print() const { - return L"[" + name_ + L"]"; + return L"async_event_server[:" + listen_port_ + L"]"; } std::wstring address() const { return u16(socket_->local_endpoint().address().to_string()); } - - virtual void send(std::string&& data) + + std::wstring ipv4_address() const + { + return socket_->is_open() ? u16(socket_->remote_endpoint().address().to_string()) : L"no-address"; + } + + void send(std::string&& data) { send_queue_.push(std::move(data)); - service_.dispatch([=] { do_write(); }); + auto self = shared_from_this(); + service_->dispatch([=] { self->do_write(); }); } - virtual void disconnect() + void disconnect() { - service_.dispatch([=] { stop(); }); + auto self = shared_from_this(); + service_->dispatch([=] { self->stop(); }); } void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr& lifecycle_bound) @@ -177,7 +179,6 @@ public: return std::shared_ptr(); } - /**************/ private: void do_write() //always called from the asio-service-thread { @@ -194,70 +195,50 @@ private: void stop() //always called from the asio-service-thread { connection_set_->erase(shared_from_this()); - try - { - socket_->close(); - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - } - - CASPAR_LOG(info) << print() << L" Disconnected."; - } - const std::string ipv4_address() const - { - return socket_->is_open() ? socket_->remote_endpoint().address().to_string() : "no-address"; + CASPAR_LOG(info) << print() << L" Client " << ipv4_address() << L" disconnected (" << connection_set_->size() << L" connections)."; + + boost::system::error_code ec; + socket_->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec); + socket_->close(ec); } - connection(const spl::shared_ptr& socket, const protocol_strategy_factory::ptr& protocol_factory, const spl::shared_ptr& connection_set) + connection(const std::shared_ptr& service, const spl::shared_ptr& socket, const protocol_strategy_factory::ptr& protocol_factory, const spl::shared_ptr& connection_set) : socket_(socket) - , service_(socket->get_io_service()) - , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast(socket_->local_endpoint().port())) : L"no-address")) + , service_(service) + , listen_port_(socket_->is_open() ? boost::lexical_cast(socket_->local_endpoint().port()) : L"no-port") , connection_set_(connection_set) , protocol_factory_(protocol_factory) , is_writing_(false) { - CASPAR_LOG(info) << print() << L" Connected."; + CASPAR_LOG(info) << print() << L" Accepted connection from " << ipv4_address() << L" (" << (connection_set_->size() + 1) << L" connections)."; } - protocol_strategy& protocol() //always called from the asio-service-thread - { - if (!protocol_) - protocol_ = protocol_factory_->create(spl::make_shared(shared_from_this())); - - return *protocol_; - } - void handle_read(const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread - { + { if(!error) { try { std::string data(data_.begin(), data_.begin() + bytes_transferred); - CASPAR_LOG(trace) << print() << L" Received: " << u16(data); - - protocol().parse(data); + protocol_->parse(data); } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); } - + read_some(); - } + } else if (error != boost::asio::error::operation_aborted) - stop(); + stop(); } void handle_write(const spl::shared_ptr& str, const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread { if(!error) { - CASPAR_LOG(trace) << print() << L" Sent: " << (str->size() < 512 ? u16(*str) : L"more than 512 bytes."); if(bytes_transferred != str->size()) { str->assign(str->substr(bytes_transferred)); @@ -269,7 +250,7 @@ private: do_write(); } } - else if (error != boost::asio::error::operation_aborted) + else if (error != boost::asio::error::operation_aborted && socket_->is_open()) stop(); } @@ -277,7 +258,7 @@ private: { socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } - + void write_some(std::string&& data) //always called from the asio-service-thread { is_writing_ = true; @@ -288,81 +269,102 @@ private: friend struct AsyncEventServer::implementation; }; -struct AsyncEventServer::implementation +struct AsyncEventServer::implementation : public spl::enable_shared_from_this { - boost::asio::io_service service_; - tcp::acceptor acceptor_; - protocol_strategy_factory::ptr protocol_factory_; - spl::shared_ptr connection_set_; - boost::thread thread_; - std::vector lifecycle_factories_; - tbb::mutex mutex_; - - implementation(const protocol_strategy_factory::ptr& protocol, unsigned short port) - : acceptor_(service_, tcp::endpoint(tcp::v4(), port)) + std::shared_ptr service_; + tcp::acceptor acceptor_; + protocol_strategy_factory::ptr protocol_factory_; + spl::shared_ptr connection_set_; + std::vector lifecycle_factories_; + tbb::mutex mutex_; + + implementation(std::shared_ptr service, const protocol_strategy_factory::ptr& protocol, unsigned short port) + : service_(std::move(service)) + , acceptor_(*service_, tcp::endpoint(tcp::v4(), port)) , protocol_factory_(protocol) - , thread_([&] { service_.run(); }) { - start_accept(); } - ~implementation() + void stop() { try { - acceptor_.close(); + acceptor_.cancel(); + acceptor_.close(); } - catch(...) + catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); } + } + + ~implementation() + { + auto conns_set = connection_set_; - service_.post([=] + service_->post([conns_set] { - auto connections = *connection_set_; + auto connections = *conns_set; for (auto& connection : connections) - connection->stop(); + connection->stop(); }); - - thread_.join(); } - - void start_accept() + + void start_accept() { - spl::shared_ptr socket(new tcp::socket(service_)); - acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1)); + spl::shared_ptr socket(new tcp::socket(*service_)); + acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, shared_from_this(), socket, std::placeholders::_1)); } - void handle_accept(const spl::shared_ptr& socket, const boost::system::error_code& error) + void handle_accept(const spl::shared_ptr& socket, const boost::system::error_code& error) { if (!acceptor_.is_open()) return; - + if (!error) { - auto conn = connection::create(socket, protocol_factory_, connection_set_); + boost::system::error_code ec; + socket->set_option(boost::asio::socket_base::keep_alive(true), ec); + + if (ec) + CASPAR_LOG(warning) << print() << L" Failed to enable TCP keep-alive on socket"; + + auto conn = connection::create(service_, socket, protocol_factory_, connection_set_); connection_set_->insert(conn); for (auto& lifecycle_factory : lifecycle_factories_) { - auto lifecycle_bound = lifecycle_factory(conn->ipv4_address()); + auto lifecycle_bound = lifecycle_factory(u8(conn->ipv4_address())); conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second); } } start_accept(); } + std::wstring print() const + { + return L"async_event_server[:" + boost::lexical_cast(acceptor_.local_endpoint().port()) + L"]"; + } + void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { - service_.post([=]{ lifecycle_factories_.push_back(factory); }); + auto self = shared_from_this(); + service_->post([=]{ self->lifecycle_factories_.push_back(factory); }); } }; AsyncEventServer::AsyncEventServer( - const protocol_strategy_factory::ptr& protocol, unsigned short port) - : impl_(new implementation(protocol, port)) {} + std::shared_ptr service, const protocol_strategy_factory::ptr& protocol, unsigned short port) + : impl_(new implementation(std::move(service), protocol, port)) +{ + impl_->start_accept(); +} + +AsyncEventServer::~AsyncEventServer() +{ + impl_->stop(); +} -AsyncEventServer::~AsyncEventServer() {} void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); } }}