using boost::asio::ip::tcp;
namespace caspar { namespace IO {
-
+
class connection;
typedef std::set<spl::shared_ptr<connection>> connection_set;
class connection : public spl::enable_shared_from_this<connection>
-{
+{
typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
typedef tbb::concurrent_queue<std::string> send_queue;
- const spl::shared_ptr<tcp::socket> socket_;
+ const spl::shared_ptr<tcp::socket> socket_;
std::shared_ptr<boost::asio::io_service> service_;
const std::wstring listen_port_;
const spl::shared_ptr<connection_set> connection_set_;
{
return u16(socket_->local_endpoint().address().to_string());
}
-
+
std::wstring ipv4_address() const
{
return socket_->is_open() ? u16(socket_->remote_endpoint().address().to_string()) : L"no-address";
CASPAR_LOG(info) << print() << L" Client " << ipv4_address() << L" disconnected (" << connection_set_->size() << L" connections).";
- try
- {
- socket_->cancel();
- socket_->close();
- }
- catch(...)
- {
- CASPAR_LOG_CURRENT_EXCEPTION();
- }
+ boost::system::error_code ec;
+ socket_->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
+ socket_->close(ec);
}
- connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
+ connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
: socket_(socket)
, service_(service)
, listen_port_(socket_->is_open() ? boost::lexical_cast<std::wstring>(socket_->local_endpoint().port()) : L"no-port")
{
CASPAR_LOG(info) << print() << L" Accepted connection from " << ipv4_address() << L" (" << (connection_set_->size() + 1) << L" connections).";
}
-
+
void handle_read(const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread
- {
+ {
if(!error)
{
try
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
+
read_some();
- }
+ }
else if (error != boost::asio::error::operation_aborted)
- stop();
+ stop();
}
void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread
do_write();
}
}
- else if (error != boost::asio::error::operation_aborted && socket_->is_open())
+ else if (error != boost::asio::error::operation_aborted && socket_->is_open())
stop();
}
{
socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
-
+
void write_some(std::string&& data) //always called from the asio-service-thread
{
is_writing_ = true;
connection->stop();
});
}
-
- void start_accept()
+
+ void start_accept()
{
spl::shared_ptr<tcp::socket> socket(new tcp::socket(*service_));
acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, shared_from_this(), socket, std::placeholders::_1));
}
- void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
+ void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
{
if (!acceptor_.is_open())
return;
-
+
if (!error)
{
boost::system::error_code ec;
if (ec)
CASPAR_LOG(warning) << print() << L" Failed to enable TCP keep-alive on socket";
-
+
auto conn = connection::create(service_, socket, protocol_factory_, connection_set_);
connection_set_->insert(conn);