]> git.sesse.net Git - casparcg/blobdiff - protocol/util/AsyncEventServer.cpp
[AsyncEventServer] Fixed bug where server expected to be the one closing the socket...
[casparcg] / protocol / util / AsyncEventServer.cpp
index 8348c56b97f4754d7bbaf3ddd992643e6b7b77c9..cdfe538345206bbf36b72a7f881c66b269bebd9c 100644 (file)
 using boost::asio::ip::tcp;
 
 namespace caspar { namespace IO {
-       
+
 class connection;
 
 typedef std::set<spl::shared_ptr<connection>> connection_set;
 
 class connection : public spl::enable_shared_from_this<connection>
-{   
+{
        typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
        typedef tbb::concurrent_queue<std::string>      send_queue;
 
-    const spl::shared_ptr<tcp::socket>                         socket_; 
+    const spl::shared_ptr<tcp::socket>                         socket_;
        std::shared_ptr<boost::asio::io_service>                service_;
        const std::wstring                                                              listen_port_;
        const spl::shared_ptr<connection_set>                   connection_set_;
@@ -142,7 +142,7 @@ public:
        {
                return u16(socket_->local_endpoint().address().to_string());
        }
-       
+
        std::wstring ipv4_address() const
        {
                return socket_->is_open() ? u16(socket_->remote_endpoint().address().to_string()) : L"no-address";
@@ -198,18 +198,12 @@ private:
 
                CASPAR_LOG(info) << print() << L" Client " << ipv4_address() << L" disconnected (" << connection_set_->size() << L" connections).";
 
-               try
-               {
-                       socket_->cancel();
-                       socket_->close();
-               }
-               catch(...)
-               {
-                       CASPAR_LOG_CURRENT_EXCEPTION();
-               }
+               boost::system::error_code ec;
+               socket_->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
+               socket_->close(ec);
        }
 
-    connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
+    connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
                : socket_(socket)
                , service_(service)
                , listen_port_(socket_->is_open() ? boost::lexical_cast<std::wstring>(socket_->local_endpoint().port()) : L"no-port")
@@ -219,9 +213,9 @@ private:
        {
                CASPAR_LOG(info) << print() << L" Accepted connection from " << ipv4_address() << L" (" << (connection_set_->size() + 1) << L" connections).";
     }
-                       
+
     void handle_read(const boost::system::error_code& error, size_t bytes_transferred)         //always called from the asio-service-thread
-       {               
+       {
                if(!error)
                {
                        try
@@ -234,11 +228,11 @@ private:
                        {
                                CASPAR_LOG_CURRENT_EXCEPTION();
                        }
-                       
+
                        read_some();
-               }  
+               }
                else if (error != boost::asio::error::operation_aborted)
-                       stop();         
+                       stop();
     }
 
     void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred)       //always called from the asio-service-thread
@@ -256,7 +250,7 @@ private:
                                do_write();
                        }
                }
-               else if (error != boost::asio::error::operation_aborted && socket_->is_open())          
+               else if (error != boost::asio::error::operation_aborted && socket_->is_open())
                        stop();
     }
 
@@ -264,7 +258,7 @@ private:
        {
                socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
        }
-       
+
        void write_some(std::string&& data)     //always called from the asio-service-thread
        {
                is_writing_ = true;
@@ -315,20 +309,26 @@ struct AsyncEventServer::implementation : public spl::enable_shared_from_this<im
                                connection->stop();
                });
        }
-               
-       void start_accept() 
+
+       void start_accept()
        {
                spl::shared_ptr<tcp::socket> socket(new tcp::socket(*service_));
                acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, shared_from_this(), socket, std::placeholders::_1));
     }
 
-       void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
+       void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
        {
                if (!acceptor_.is_open())
                        return;
-               
+
         if (!error)
                {
+                       boost::system::error_code ec;
+                       socket->set_option(boost::asio::socket_base::keep_alive(true), ec);
+
+                       if (ec)
+                               CASPAR_LOG(warning) << print() << L" Failed to enable TCP keep-alive on socket";
+
                        auto conn = connection::create(service_, socket, protocol_factory_, connection_set_);
                        connection_set_->insert(conn);
 
@@ -341,6 +341,11 @@ struct AsyncEventServer::implementation : public spl::enable_shared_from_this<im
                start_accept();
     }
 
+       std::wstring print() const
+       {
+               return L"async_event_server[:" + boost::lexical_cast<std::wstring>(acceptor_.local_endpoint().port()) + L"]";
+       }
+
        void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
        {
                auto self = shared_from_this();