]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
Misc modifications to fix problems found by static code analysis and some simplificat...
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "..\stdafx.h"
23
24 #include "AsyncEventServer.h"
25
26 #include <algorithm>
27 #include <array>
28 #include <string>
29 #include <set>
30 #include <memory>
31
32 #include <boost/asio.hpp>
33 #include <boost/thread.hpp>
34 #include <boost/lexical_cast.hpp>
35
36 #include <tbb\mutex.h>
37 #include <tbb\concurrent_hash_map.h>
38 #include <tbb\concurrent_queue.h>
39
40 using boost::asio::ip::tcp;
41
42 namespace caspar { namespace IO {
43         
44 class connection;
45
46 typedef std::set<spl::shared_ptr<connection>> connection_set;
47
48 class connection : public spl::enable_shared_from_this<connection>
49 {   
50         typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
51         typedef tbb::concurrent_queue<std::string>      send_queue;
52
53     const spl::shared_ptr<tcp::socket>                          socket_; 
54         boost::asio::io_service&                                                service_;
55         const spl::shared_ptr<connection_set>                   connection_set_;
56         const std::wstring                                                              name_;
57         protocol_strategy_factory<char>::ptr                    protocol_factory_;
58         std::shared_ptr<protocol_strategy<char>>                protocol_;
59
60         std::array<char, 32768>                                                 data_;
61         lifecycle_map_type                                                              lifecycle_bound_objects_;
62         send_queue                                                                              send_queue_;
63         bool                                                                                    is_writing_;
64
65         class connection_holder : public client_connection<char>
66         {
67                 std::weak_ptr<connection> connection_;
68         public:
69                 explicit connection_holder(std::weak_ptr<connection> conn) : connection_(conn)
70                 {}
71
72                 virtual void send(std::basic_string<char>&& data)
73                 {
74                         auto conn = connection_.lock();
75                         conn->send(std::move(data));
76                 }
77                 virtual void disconnect()
78                 {
79                         auto conn = connection_.lock();
80                         conn->disconnect();
81                 }
82                 virtual std::wstring print() const
83                 {
84                         auto conn = connection_.lock();
85                         return conn->print();
86                 }
87
88                 virtual void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
89                 {
90                         auto conn = connection_.lock();
91                         return conn->add_lifecycle_bound_object(key, lifecycle_bound);
92                 }
93                 virtual std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)
94                 {
95                         auto conn = connection_.lock();
96                         return conn->remove_lifecycle_bound_object(key);
97                 }
98         };
99
100 public:
101     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
102         {
103                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
104                 con->read_some();
105                 return con;
106     }
107
108         ~connection()
109         {
110                 CASPAR_LOG(info) << print() << L" connection destroyed.";
111         }
112
113         std::wstring print() const
114         {
115                 return L"[" + name_ + L"]";
116         }
117         
118         virtual void send(std::string&& data)
119         {
120                 send_queue_.push(std::move(data));
121                 service_.dispatch([=] { do_write(); });
122         }
123
124         virtual void disconnect()
125         {
126                 service_.dispatch([=] { stop(); });
127         }
128
129         void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
130         {
131                 //thread-safe tbb_concurrent_hash_map
132                 lifecycle_bound_objects_.insert(std::pair<std::wstring, std::shared_ptr<void>>(key, lifecycle_bound));
133         }
134         std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)
135         {
136                 //thread-safe tbb_concurrent_hash_map
137                 lifecycle_map_type::const_accessor acc;
138                 if(lifecycle_bound_objects_.find(acc, key))
139                 {
140                         auto result = acc->second;
141                         lifecycle_bound_objects_.erase(acc);
142                         return result;
143                 }
144                 return std::shared_ptr<void>();
145         }
146
147         /**************/
148 private:
149         void do_write() //always called from the asio-service-thread
150         {
151                 if(!is_writing_)
152                 {
153                         std::string data;
154                         if(send_queue_.try_pop(data))
155                         {
156                                 write_some(std::move(data));
157                         }
158                 }
159         }
160
161         void stop()     //always called from the asio-service-thread
162         {
163                 connection_set_->erase(shared_from_this());
164                 try
165                 {
166                         socket_->close();
167                 }
168                 catch(...)
169                 {
170                         CASPAR_LOG_CURRENT_EXCEPTION();
171                 }
172                 
173                 CASPAR_LOG(info) << print() << L" Disconnected.";
174         }
175
176         const std::string ipv4_address() const
177         {
178                 return socket_->is_open() ? socket_->local_endpoint().address().to_string() : "no-address";
179         }
180
181     connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
182                 : socket_(socket)
183                 , service_(socket->get_io_service())
184                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
185                 , connection_set_(connection_set)
186                 , protocol_factory_(protocol_factory)
187                 , is_writing_(false)
188         {
189                 CASPAR_LOG(info) << print() << L" Connected.";
190     }
191
192         protocol_strategy<char>& protocol()     //always called from the asio-service-thread
193         {
194                 if (!protocol_)
195                         protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
196
197                 return *protocol_;
198         }
199                         
200     void handle_read(const boost::system::error_code& error, size_t bytes_transferred)  //always called from the asio-service-thread
201         {               
202                 if(!error)
203                 {
204                         try
205                         {
206                                 std::string data(data_.begin(), data_.begin() + bytes_transferred);
207
208                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(data);
209
210                                 protocol().parse(data);
211                         }
212                         catch(...)
213                         {
214                                 CASPAR_LOG_CURRENT_EXCEPTION();
215                         }
216                         
217                         read_some();
218                 }  
219                 else if (error != boost::asio::error::operation_aborted)
220                         stop();         
221     }
222
223     void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred)        //always called from the asio-service-thread
224         {
225                 if(!error)
226                 {
227                         CASPAR_LOG(trace) << print() << L" Sent: " << (str->size() < 512 ? u16(*str) : L"more than 512 bytes.");
228                         if(bytes_transferred != str->size())
229                         {
230                                 str->assign(str->substr(bytes_transferred));
231                                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
232                         }
233                         else
234                         {
235                                 is_writing_ = false;
236                                 do_write();
237                         }
238                 }
239                 else if (error != boost::asio::error::operation_aborted)                
240                         stop();
241     }
242
243         void read_some()        //always called from the asio-service-thread
244         {
245                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
246         }
247         
248         void write_some(std::string&& data)     //always called from the asio-service-thread
249         {
250                 is_writing_ = true;
251                 auto str = spl::make_shared<std::string>(std::move(data));
252                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
253         }
254
255         friend struct AsyncEventServer::implementation;
256 };
257
258 struct AsyncEventServer::implementation
259 {
260         boost::asio::io_service                                 service_;
261         tcp::acceptor                                                   acceptor_;
262         protocol_strategy_factory<char>::ptr    protocol_factory_;
263         spl::shared_ptr<connection_set>                 connection_set_;
264         boost::thread                                                   thread_;
265         std::vector<lifecycle_factory_t>                lifecycle_factories_;
266         tbb::mutex mutex_;
267
268         implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
269                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
270                 , protocol_factory_(protocol)
271                 , thread_(std::bind(&boost::asio::io_service::run, &service_))
272         {
273                 start_accept();
274         }
275
276         ~implementation()
277         {
278                 try
279                 {
280                         acceptor_.close();                      
281                 }
282                 catch(...)
283                 {
284                         CASPAR_LOG_CURRENT_EXCEPTION();
285                 }
286
287                 service_.post([=]
288                 {
289                         auto connections = *connection_set_;
290                         BOOST_FOREACH(auto& connection, connections)
291                                 connection->stop();                             
292                 });
293
294                 thread_.join();
295         }
296                 
297     void start_accept() 
298         {
299                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
300                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
301     }
302
303         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
304         {
305                 if (!acceptor_.is_open())
306                         return;
307                 
308         if (!error)
309                 {
310                         auto conn = connection::create(socket, protocol_factory_, connection_set_);
311                         connection_set_->insert(conn);
312
313                         BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)
314                         {
315                                 auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
316                                 conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second);
317                         }
318                 }
319                 start_accept();
320     }
321
322         void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
323         {
324                 service_.post([=]{ lifecycle_factories_.push_back(factory); });
325         }
326 };
327
328 AsyncEventServer::AsyncEventServer(
329                 const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
330         : impl_(new implementation(protocol, port)) {}
331
332 AsyncEventServer::~AsyncEventServer() {}
333 void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
334
335 }}