]> git.sesse.net Git - casparcg/commitdiff
* AsyncEventServer: implemented send-queue and protected socket->async_write_some...
authorniklaspandersson <niklas.p.andersson@svt.se>
Wed, 14 Aug 2013 10:56:23 +0000 (12:56 +0200)
committerniklaspandersson <niklas.p.andersson@svt.se>
Wed, 14 Aug 2013 10:56:23 +0000 (12:56 +0200)
protocol/util/AsyncEventServer.cpp

index 59de842688aff457f778caa5beb4cef01a78ed9f..d7225d5b148d82b78e8d6dfabcb50c2e2ab65fd6 100644 (file)
@@ -35,6 +35,7 @@
 
 #include <tbb\mutex.h>
 #include <tbb\concurrent_hash_map.h>
+#include <tbb\concurrent_queue.h>
 
 using boost::asio::ip::tcp;
 
@@ -47,6 +48,7 @@ typedef std::set<spl::shared_ptr<connection>> connection_set;
 class connection : public spl::enable_shared_from_this<connection>
 {   
        typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
+       typedef tbb::concurrent_queue<std::string>      send_queue;
 
     const spl::shared_ptr<tcp::socket>                         socket_; 
        boost::asio::io_service&                                                service_;
@@ -56,8 +58,9 @@ class connection : public spl::enable_shared_from_this<connection>
        std::shared_ptr<protocol_strategy<char>>                protocol_;
 
        std::array<char, 32768>                                                 data_;
-       //std::map<std::wstring, std::shared_ptr<void>> lifecycle_bound_objects_;
-       lifecycle_map_type lifecycle_bound_objects_;
+       lifecycle_map_type                                                              lifecycle_bound_objects_;
+       send_queue                                                                              send_queue_;
+       bool                                                                                    is_writing_;
 
        class connection_holder : public client_connection<char>
        {
@@ -68,7 +71,6 @@ class connection : public spl::enable_shared_from_this<connection>
 
                virtual void send(std::basic_string<char>&& data)
                {
-                       //TODO: need to implement a send-queue
                        auto conn = connection_.lock();
                        conn->send(std::move(data));
                }
@@ -115,7 +117,8 @@ public:
        
        virtual void send(std::string&& data)
        {
-               write_some(std::move(data));
+               send_queue_.push(std::move(data));
+               service_.dispatch([=] { do_write(); });
        }
 
        virtual void disconnect()
@@ -143,7 +146,19 @@ public:
 
        /**************/
 private:
-       void stop()
+       void do_write() //always called from the asio-service-thread
+       {
+               if(!is_writing_)
+               {
+                       std::string data;
+                       if(send_queue_.try_pop(data))
+                       {
+                               write_some(std::move(data));
+                       }
+               }
+       }
+
+       void stop()     //always called from the asio-service-thread
        {
                connection_set_->erase(shared_from_this());
                try
@@ -169,11 +184,12 @@ private:
                , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
                , connection_set_(connection_set)
                , protocol_factory_(protocol_factory)
+               , is_writing_(false)
        {
                CASPAR_LOG(info) << print() << L" Connected.";
     }
 
-       protocol_strategy<char>& protocol()
+       protocol_strategy<char>& protocol()     //always called from the asio-service-thread
        {
                if (!protocol_)
                        protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
@@ -181,7 +197,7 @@ private:
                return *protocol_;
        }
                        
-    void handle_read(const boost::system::error_code& error, size_t bytes_transferred) 
+    void handle_read(const boost::system::error_code& error, size_t bytes_transferred)         //always called from the asio-service-thread
        {               
                if(!error)
                {
@@ -204,23 +220,34 @@ private:
                        stop();         
     }
 
-    void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
+    void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred)       //always called from the asio-service-thread
        {
                if(!error)
                {
-                       CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");              
+                       CASPAR_LOG(trace) << print() << L" Sent: " << (str->size() < 512 ? u16(*str) : L"more than 512 bytes.");
+                       if(bytes_transferred != str->size())
+                       {
+                               str->assign(str->substr(bytes_transferred));
+                               socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
+                       }
+                       else
+                       {
+                               is_writing_ = false;
+                               do_write();
+                       }
                }
                else if (error != boost::asio::error::operation_aborted)                
                        stop();
     }
 
-       void read_some()
+       void read_some()        //always called from the asio-service-thread
        {
                socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
        }
        
-       void write_some(std::string&& data)
+       void write_some(std::string&& data)     //always called from the asio-service-thread
        {
+               is_writing_ = true;
                auto str = spl::make_shared<std::string>(std::move(data));
                socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
        }