]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
Merged CLK changes from trunk, and separated delimiter message splitting and codepage...
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "..\stdafx.h"
23
24 #include "AsyncEventServer.h"
25
26 #include <algorithm>
27 #include <array>
28 #include <string>
29 #include <set>
30 #include <memory>
31
32 #include <boost/asio.hpp>
33 #include <boost/thread.hpp>
34 #include <boost/lexical_cast.hpp>
35
36 using boost::asio::ip::tcp;
37
38 namespace caspar { namespace IO {
39         
40 class connection;
41
42 typedef std::set<spl::shared_ptr<connection>> connection_set;
43
44 class connection : public spl::enable_shared_from_this<connection>, public client_connection<char>
45 {    
46     const spl::shared_ptr<tcp::socket>                  socket_; 
47         const spl::shared_ptr<connection_set>           connection_set_;
48         const std::wstring                                                      name_;
49         protocol_strategy_factory<char>::ptr            protocol_factory_;
50         std::shared_ptr<protocol_strategy<char>>        protocol_;
51
52         std::array<char, 32768>                                         data_;
53
54 public:
55     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
56         {
57                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
58                 con->read_some();
59                 return con;
60     }
61
62         std::wstring print() const
63         {
64                 return L"[" + name_ + L"]";
65         }
66         
67         /* ClientInfo */
68         
69         virtual void send(std::string&& data)
70         {
71                 write_some(std::move(data));
72         }
73
74         virtual void disconnect()
75         {
76                 stop();
77         }
78         
79         /**************/
80         
81         void stop()
82         {
83                 connection_set_->erase(shared_from_this());
84                 try
85                 {
86                         socket_->close();
87                 }
88                 catch(...)
89                 {
90                         CASPAR_LOG_CURRENT_EXCEPTION();
91                 }
92                 CASPAR_LOG(info) << print() << L" Disconnected.";
93         }
94
95 private:
96     connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol, const spl::shared_ptr<connection_set>& connection_set) 
97                 : socket_(socket)
98                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
99                 , connection_set_(connection_set)
100                 , protocol_factory_(protocol)
101         {
102                 CASPAR_LOG(info) << print() << L" Connected.";
103     }
104
105         protocol_strategy<char>& protocol()
106         {
107                 if (!protocol_)
108                         protocol_ = protocol_factory_->create(shared_from_this());
109
110                 return *protocol_;
111         }
112                         
113     void handle_read(const boost::system::error_code& error, size_t bytes_transferred) 
114         {               
115                 if(!error)
116                 {
117                         try
118                         {
119                                 std::string data(data_.begin(), data_.begin() + bytes_transferred);
120
121                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(data);
122
123                                 protocol().parse(data);
124                         }
125                         catch(...)
126                         {
127                                 CASPAR_LOG_CURRENT_EXCEPTION();
128                         }
129                         
130                         read_some();
131                 }  
132                 else if (error != boost::asio::error::operation_aborted)
133                         stop();         
134     }
135
136     void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
137         {
138                 if(!error)                      
139                         CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");              
140                 else if (error != boost::asio::error::operation_aborted)                
141                         stop();         
142     }
143
144         void read_some()
145         {
146                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
147         }
148         
149         void write_some(std::string&& data)
150         {
151                 auto str = spl::make_shared<std::string>(std::move(data));
152                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
153         }
154 };
155
156 struct AsyncEventServer::implementation
157 {
158         boost::asio::io_service                                 service_;
159         tcp::acceptor                                                   acceptor_;
160         protocol_strategy_factory<char>::ptr    protocol_;
161         spl::shared_ptr<connection_set>                 connection_set_;
162         boost::thread                                                   thread_;
163
164         implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
165                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
166                 , protocol_(protocol)
167                 , thread_(std::bind(&boost::asio::io_service::run, &service_))
168         {
169                 start_accept();
170         }
171
172         ~implementation()
173         {
174                 try
175                 {
176                         acceptor_.close();                      
177                 }
178                 catch(...)
179                 {
180                         CASPAR_LOG_CURRENT_EXCEPTION();
181                 }
182
183                 service_.post([=]
184                 {
185                         auto connections = *connection_set_;
186                         BOOST_FOREACH(auto& connection, connections)
187                                 connection->stop();                             
188                 });
189
190                 thread_.join();
191         }
192                 
193     void start_accept() 
194         {
195                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
196                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
197     }
198
199         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
200         {
201                 if (!acceptor_.is_open())
202                         return;
203                 
204         if (!error)             
205                         connection_set_->insert(connection::create(socket, protocol_, connection_set_));
206
207                 start_accept();
208     }
209 };
210
211 AsyncEventServer::AsyncEventServer(
212                 const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
213         : impl_(new implementation(protocol, port))
214 {
215 }
216
217 AsyncEventServer::~AsyncEventServer()
218 {
219 }
220
221 }}