]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
* implemented lifecycle-bound functions in caspar::io::connection
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "..\stdafx.h"
23
24 #include "AsyncEventServer.h"
25
26 #include <algorithm>
27 #include <array>
28 #include <string>
29 #include <set>
30 #include <memory>
31
32 #include <boost/asio.hpp>
33 #include <boost/thread.hpp>
34 #include <boost/lexical_cast.hpp>
35
36 #include <tbb\mutex.h>
37
38 using boost::asio::ip::tcp;
39
40 namespace caspar { namespace IO {
41         
42 class connection;
43
44 typedef std::set<spl::shared_ptr<connection>> connection_set;
45
46 class connection : public spl::enable_shared_from_this<connection>
47 {   
48     const spl::shared_ptr<tcp::socket>                  socket_; 
49         const spl::shared_ptr<connection_set>           connection_set_;
50         const std::wstring                                                      name_;
51         protocol_strategy_factory<char>::ptr            protocol_factory_;
52         std::shared_ptr<protocol_strategy<char>>        protocol_;
53
54         std::array<char, 32768>                                         data_;
55         std::vector<std::shared_ptr<void>>                      lifecycle_bound_items_;
56
57         class connection_holder : public client_connection<char>
58         {
59                 std::weak_ptr<connection> connection_;
60         public:
61                 explicit connection_holder(std::weak_ptr<connection> conn) : connection_(conn)
62                 {}
63
64                 virtual void send(std::basic_string<char>&& data)
65                 {
66                         auto conn = connection_.lock();
67                         conn->send(std::move(data));
68                 }
69                 virtual void disconnect()
70                 {
71                         auto conn = connection_.lock();
72                         conn->disconnect();
73                 }
74                 virtual std::wstring print() const
75                 {
76                         auto conn = connection_.lock();
77                         return conn->print();
78                 }
79
80                 virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
81                 {
82                         auto conn = connection_.lock();
83                         return conn->bind_to_lifecycle(lifecycle_bound);
84                 }
85         };
86
87 public:
88     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
89         {
90                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
91                 con->read_some();
92                 return con;
93     }
94
95         ~connection()
96         {
97                 CASPAR_LOG(info) << print() << L" connection destroyed.";
98         }
99
100         std::wstring print() const
101         {
102                 return L"[" + name_ + L"]";
103         }
104
105         const std::string ipv4_address() const
106         {
107                 return socket_->is_open() ? socket_->local_endpoint().address().to_string() : "no-address";
108         }
109         
110         /* ClientInfo */
111         
112         virtual void send(std::string&& data)
113         {
114                 write_some(std::move(data));
115         }
116
117         virtual void disconnect()
118         {
119                 stop();
120         }
121         void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
122         {
123                 lifecycle_bound_items_.push_back(lifecycle_bound);
124         }
125
126         /**************/
127         
128         void stop()
129         {
130                 connection_set_->erase(shared_from_this());
131                 try
132                 {
133                         socket_->close();
134                 }
135                 catch(...)
136                 {
137                         CASPAR_LOG_CURRENT_EXCEPTION();
138                 }
139                 
140                 CASPAR_LOG(info) << print() << L" Disconnected.";
141         }
142
143 private:
144     connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
145                 : socket_(socket)
146                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
147                 , connection_set_(connection_set)
148                 , protocol_factory_(protocol_factory)
149         {
150                 CASPAR_LOG(info) << print() << L" Connected.";
151     }
152
153         protocol_strategy<char>& protocol()
154         {
155                 if (!protocol_)
156                         protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
157
158                 return *protocol_;
159         }
160                         
161     void handle_read(const boost::system::error_code& error, size_t bytes_transferred) 
162         {               
163                 if(!error)
164                 {
165                         try
166                         {
167                                 std::string data(data_.begin(), data_.begin() + bytes_transferred);
168
169                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(data);
170
171                                 protocol().parse(data);
172                         }
173                         catch(...)
174                         {
175                                 CASPAR_LOG_CURRENT_EXCEPTION();
176                         }
177                         
178                         read_some();
179                 }  
180                 else if (error != boost::asio::error::operation_aborted)
181                         stop();         
182     }
183
184     void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
185         {
186                 if(!error)                      
187                         CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");              
188                 else if (error != boost::asio::error::operation_aborted)                
189                         stop();         
190     }
191
192         void read_some()
193         {
194                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
195         }
196         
197         void write_some(std::string&& data)
198         {
199                 auto str = spl::make_shared<std::string>(std::move(data));
200                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
201         }
202 };
203
204 struct AsyncEventServer::implementation
205 {
206         boost::asio::io_service                                 service_;
207         tcp::acceptor                                                   acceptor_;
208         protocol_strategy_factory<char>::ptr    protocol_factory_;
209         spl::shared_ptr<connection_set>                 connection_set_;
210         boost::thread                                                   thread_;
211         std::vector<lifecycle_factory_t>                lifecycle_factories_;
212         tbb::mutex mutex_;
213
214         implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
215                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
216                 , protocol_factory_(protocol)
217                 , thread_(std::bind(&boost::asio::io_service::run, &service_))
218         {
219                 start_accept();
220         }
221
222         ~implementation()
223         {
224                 try
225                 {
226                         acceptor_.close();                      
227                 }
228                 catch(...)
229                 {
230                         CASPAR_LOG_CURRENT_EXCEPTION();
231                 }
232
233                 service_.post([=]
234                 {
235                         auto connections = *connection_set_;
236                         BOOST_FOREACH(auto& connection, connections)
237                                 connection->stop();                             
238                 });
239
240                 thread_.join();
241         }
242                 
243     void start_accept() 
244         {
245                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
246                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
247     }
248
249         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
250         {
251                 if (!acceptor_.is_open())
252                         return;
253                 
254         if (!error)
255                 {
256                         auto conn = connection::create(socket, protocol_factory_, connection_set_);
257                         connection_set_->insert(conn);
258
259                         {
260                                 tbb::mutex::scoped_lock lock(mutex_);
261
262                                 BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)
263                                 {
264                                         auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
265                                         conn->bind_to_lifecycle(lifecycle_bound);
266                                 }
267                         }
268                 }
269                 start_accept();
270     }
271
272         void add_client_lifecycle_event_factory(const lifecycle_factory_t& factory)
273         {
274                 tbb::mutex::scoped_lock lock(mutex_);
275                 lifecycle_factories_.push_back(factory);
276         }
277 };
278
279 AsyncEventServer::AsyncEventServer(
280                 const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
281         : impl_(new implementation(protocol, port)) {}
282
283 AsyncEventServer::~AsyncEventServer() {}
284 void AsyncEventServer::add_client_lifecycle_event_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_event_factory(factory); }
285
286 }}