#include <tbb\mutex.h>
#include <tbb\concurrent_hash_map.h>
+#include <tbb\concurrent_queue.h>
using boost::asio::ip::tcp;
class connection : public spl::enable_shared_from_this<connection>
{
typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
+ typedef tbb::concurrent_queue<std::string> send_queue;
const spl::shared_ptr<tcp::socket> socket_;
boost::asio::io_service& service_;
std::shared_ptr<protocol_strategy<char>> protocol_;
std::array<char, 32768> data_;
- //std::map<std::wstring, std::shared_ptr<void>> lifecycle_bound_objects_;
- lifecycle_map_type lifecycle_bound_objects_;
+ lifecycle_map_type lifecycle_bound_objects_;
+ send_queue send_queue_;
+ bool is_writing_;
class connection_holder : public client_connection<char>
{
virtual void send(std::basic_string<char>&& data)
{
- //TODO: need to implement a send-queue
auto conn = connection_.lock();
conn->send(std::move(data));
}
virtual void send(std::string&& data)
{
- write_some(std::move(data));
+ send_queue_.push(std::move(data));
+ service_.dispatch([=] { do_write(); });
}
virtual void disconnect()
/**************/
private:
- void stop()
+ void do_write() //always called from the asio-service-thread
+ {
+ if(!is_writing_)
+ {
+ std::string data;
+ if(send_queue_.try_pop(data))
+ {
+ write_some(std::move(data));
+ }
+ }
+ }
+
+ void stop() //always called from the asio-service-thread
{
connection_set_->erase(shared_from_this());
try
, name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
, connection_set_(connection_set)
, protocol_factory_(protocol_factory)
+ , is_writing_(false)
{
CASPAR_LOG(info) << print() << L" Connected.";
}
- protocol_strategy<char>& protocol()
+ protocol_strategy<char>& protocol() //always called from the asio-service-thread
{
if (!protocol_)
protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
return *protocol_;
}
- void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread
{
if(!error)
{
stop();
}
- void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
+ void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred) //always called from the asio-service-thread
{
if(!error)
{
- CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");
+ CASPAR_LOG(trace) << print() << L" Sent: " << (str->size() < 512 ? u16(*str) : L"more than 512 bytes.");
+ if(bytes_transferred != str->size())
+ {
+ str->assign(str->substr(bytes_transferred));
+ socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
+ }
+ else
+ {
+ is_writing_ = false;
+ do_write();
+ }
}
else if (error != boost::asio::error::operation_aborted)
stop();
}
- void read_some()
+ void read_some() //always called from the asio-service-thread
{
socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
- void write_some(std::string&& data)
+ void write_some(std::string&& data) //always called from the asio-service-thread
{
+ is_writing_ = true;
auto str = spl::make_shared<std::string>(std::move(data));
socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
}