const spl::shared_ptr<tcp::socket> socket_;
std::shared_ptr<boost::asio::io_service> service_;
+ const std::wstring listen_port_;
const spl::shared_ptr<connection_set> connection_set_;
- const std::wstring name_;
protocol_strategy_factory<char>::ptr protocol_factory_;
std::shared_ptr<protocol_strategy<char>> protocol_;
conn->disconnect();
}
- std::wstring print() const override
- {
- auto conn = connection_.lock();
-
- if (conn)
- return conn->print();
- else
- return L"[destroyed-connection]";
- }
-
std::wstring address() const override
{
auto conn = connection_.lock();
if (conn)
- return conn->address();
+ return conn->ipv4_address();
else
return L"[destroyed-connection]";
}
std::wstring print() const
{
- return L"[" + name_ + L"]";
+ return L"async_event_server[:" + listen_port_ + L"]";
}
std::wstring address() const
return u16(socket_->local_endpoint().address().to_string());
}
- virtual void send(std::string&& data)
+ std::wstring ipv4_address() const
+ {
+ return socket_->is_open() ? u16(socket_->remote_endpoint().address().to_string()) : L"no-address";
+ }
+
+ void send(std::string&& data)
{
send_queue_.push(std::move(data));
- service_->dispatch([=] { do_write(); });
+ auto self = shared_from_this();
+ service_->dispatch([=] { self->do_write(); });
}
- virtual void disconnect()
+ void disconnect()
{
- service_->dispatch([=] { stop(); });
+ auto self = shared_from_this();
+ service_->dispatch([=] { self->stop(); });
}
void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
return std::shared_ptr<void>();
}
- /**************/
private:
void do_write() //always called from the asio-service-thread
{
void stop() //always called from the asio-service-thread
{
connection_set_->erase(shared_from_this());
+
+ CASPAR_LOG(info) << print() << L" Client " << ipv4_address() << L" disconnected (" << connection_set_->size() << L" connections).";
+
try
{
+ socket_->cancel();
socket_->close();
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
- CASPAR_LOG(info) << print() << L" Disconnected.";
- }
-
- const std::string ipv4_address() const
- {
- return socket_->is_open() ? socket_->remote_endpoint().address().to_string() : "no-address";
}
connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
: socket_(socket)
, service_(service)
- , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
+ , listen_port_(socket_->is_open() ? boost::lexical_cast<std::wstring>(socket_->local_endpoint().port()) : L"no-port")
, connection_set_(connection_set)
, protocol_factory_(protocol_factory)
, is_writing_(false)
{
- CASPAR_LOG(info) << print() << L" Connected.";
+ CASPAR_LOG(info) << print() << L" Accepted connection from " << ipv4_address() << L" (" << (connection_set_->size() + 1) << L" connections).";
}
protocol_strategy<char>& protocol() //always called from the asio-service-thread
{
std::string data(data_.begin(), data_.begin() + bytes_transferred);
- CASPAR_LOG(trace) << print() << L" Received: " << u16(data);
-
protocol().parse(data);
}
catch(...)
{
if(!error)
{
- CASPAR_LOG(trace) << print() << L" Sent: " << (str->size() < 512 ? u16(*str) : L"more than 512 bytes.");
if(bytes_transferred != str->size())
{
str->assign(str->substr(bytes_transferred));
friend struct AsyncEventServer::implementation;
};
-struct AsyncEventServer::implementation
+struct AsyncEventServer::implementation : public spl::enable_shared_from_this<implementation>
{
std::shared_ptr<boost::asio::io_service> service_;
tcp::acceptor acceptor_;
, acceptor_(*service_, tcp::endpoint(tcp::v4(), port))
, protocol_factory_(protocol)
{
- start_accept();
}
~implementation()
for (auto& lifecycle_factory : lifecycle_factories_)
{
- auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
+ auto lifecycle_bound = lifecycle_factory(u8(conn->ipv4_address()));
conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second);
}
}
void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
{
- service_->post([=]{ lifecycle_factories_.push_back(factory); });
+ auto self = shared_from_this();
+ service_->post([=]{ self->lifecycle_factories_.push_back(factory); });
}
};
AsyncEventServer::AsyncEventServer(
std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
- : impl_(new implementation(std::move(service), protocol, port)) {}
+ : impl_(new implementation(std::move(service), protocol, port))
+{
+ impl_->start_accept();
+}
AsyncEventServer::~AsyncEventServer() {}
void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
: client_(client)\r
, codepage_(codepage)\r
{\r
- CASPAR_LOG(info) << "from_unicode_client_connection created.";\r
+ CASPAR_LOG(trace) << "from_unicode_client_connection created.";\r
}\r
~from_unicode_client_connection()\r
{\r
- CASPAR_LOG(info) << "from_unicode_client_connection destroyed.";\r
+ CASPAR_LOG(trace) << "from_unicode_client_connection destroyed.";\r
}\r
\r
void send(std::basic_string<wchar_t>&& data) override\r
{\r
boost::replace_all(data, L"\n", L"\\n");\r
boost::replace_all(data, L"\r", L"\\r");\r
- CASPAR_LOG(info) << L"Sent message to " << client_->print() << L":" << data;\r
+ CASPAR_LOG(info) << L"Sent message to " << client_->address() << L":" << data;\r
}\r
else\r
- CASPAR_LOG(info) << L"Sent more than 512 bytes to " << client_->print();\r
+ CASPAR_LOG(info) << L"Sent more than 512 bytes to " << client_->address();\r
}\r
\r
void disconnect() override\r
client_->disconnect();\r
}\r
\r
- std::wstring print() const override\r
- {\r
- return client_->print();\r
- }\r
-\r
std::wstring address() const override\r
{\r
return client_->address();\r
: strategy_(strategy)\r
, client_info_(client_connection)\r
{\r
- CASPAR_LOG(info) << "legacy_strategy_adapter created.";\r
+ CASPAR_LOG(trace) << "legacy_strategy_adapter created.";\r
}\r
~legacy_strategy_adapter()\r
{\r
- CASPAR_LOG(info) << "legacy_strategy_adapter destroyed.";\r
+ CASPAR_LOG(trace) << "legacy_strategy_adapter destroyed.";\r
}\r
\r
void parse(const std::basic_string<wchar_t>& data) override\r