]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
* Merged RESUME command.
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "../StdAfx.h"
23
24 #include "AsyncEventServer.h"
25
26 #include <algorithm>
27 #include <array>
28 #include <string>
29 #include <set>
30 #include <memory>
31 #include <functional>
32
33 #include <boost/asio.hpp>
34 #include <boost/thread.hpp>
35 #include <boost/lexical_cast.hpp>
36
37 #include <tbb/mutex.h>
38 #include <tbb/concurrent_hash_map.h>
39 #include <tbb/concurrent_queue.h>
40
41 using boost::asio::ip::tcp;
42
43 namespace caspar { namespace IO {
44         
45 class connection;
46
47 typedef std::set<spl::shared_ptr<connection>> connection_set;
48
49 class connection : public spl::enable_shared_from_this<connection>
50 {   
51         typedef tbb::concurrent_hash_map<std::wstring, std::shared_ptr<void>> lifecycle_map_type;
52         typedef tbb::concurrent_queue<std::string>      send_queue;
53
54     const spl::shared_ptr<tcp::socket>                          socket_; 
55         boost::asio::io_service&                                                service_;
56         const spl::shared_ptr<connection_set>                   connection_set_;
57         const std::wstring                                                              name_;
58         protocol_strategy_factory<char>::ptr                    protocol_factory_;
59         std::shared_ptr<protocol_strategy<char>>                protocol_;
60
61         std::array<char, 32768>                                                 data_;
62         lifecycle_map_type                                                              lifecycle_bound_objects_;
63         send_queue                                                                              send_queue_;
64         bool                                                                                    is_writing_;
65
66         class connection_holder : public client_connection<char>
67         {
68                 std::weak_ptr<connection> connection_;
69         public:
70                 explicit connection_holder(std::weak_ptr<connection> conn) : connection_(std::move(conn))
71                 {}
72
73                 void send(std::basic_string<char>&& data) override
74                 {
75                         auto conn = connection_.lock();
76
77                         if (conn)
78                                 conn->send(std::move(data));
79                 }
80
81                 void disconnect() override
82                 {
83                         auto conn = connection_.lock();
84
85                         if (conn)
86                                 conn->disconnect();
87                 }
88
89                 std::wstring print() const override
90                 {
91                         auto conn = connection_.lock();
92
93                         if (conn)
94                                 return conn->print();
95                         else
96                                 return L"[destroyed-connection]";
97                 }
98
99                 std::wstring address() const override
100                 {
101                         auto conn = connection_.lock();
102
103                         if (conn)
104                                 return conn->address();
105                         else
106                                 return L"[destroyed-connection]";
107                 }
108
109                 void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound) override
110                 {
111                         auto conn = connection_.lock();
112
113                         if (conn)
114                                 return conn->add_lifecycle_bound_object(key, lifecycle_bound);
115                 }
116
117                 std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key) override
118                 {
119                         auto conn = connection_.lock();
120
121                         if (conn)
122                                 return conn->remove_lifecycle_bound_object(key);
123                         else
124                                 return std::shared_ptr<void>();
125                 }
126         };
127
128 public:
129     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
130         {
131                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
132                 con->read_some();
133                 return con;
134     }
135
136         ~connection()
137         {
138                 CASPAR_LOG(info) << print() << L" connection destroyed.";
139         }
140
141         std::wstring print() const
142         {
143                 return L"[" + name_ + L"]";
144         }
145
146         std::wstring address() const
147         {
148                 return u16(socket_->local_endpoint().address().to_string());
149         }
150         
151         virtual void send(std::string&& data)
152         {
153                 send_queue_.push(std::move(data));
154                 service_.dispatch([=] { do_write(); });
155         }
156
157         virtual void disconnect()
158         {
159                 service_.dispatch([=] { stop(); });
160         }
161
162         void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
163         {
164                 //thread-safe tbb_concurrent_hash_map
165                 lifecycle_bound_objects_.insert(std::pair<std::wstring, std::shared_ptr<void>>(key, lifecycle_bound));
166         }
167         std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)
168         {
169                 //thread-safe tbb_concurrent_hash_map
170                 lifecycle_map_type::const_accessor acc;
171                 if(lifecycle_bound_objects_.find(acc, key))
172                 {
173                         auto result = acc->second;
174                         lifecycle_bound_objects_.erase(acc);
175                         return result;
176                 }
177                 return std::shared_ptr<void>();
178         }
179
180         /**************/
181 private:
182         void do_write() //always called from the asio-service-thread
183         {
184                 if(!is_writing_)
185                 {
186                         std::string data;
187                         if(send_queue_.try_pop(data))
188                         {
189                                 write_some(std::move(data));
190                         }
191                 }
192         }
193
194         void stop()     //always called from the asio-service-thread
195         {
196                 connection_set_->erase(shared_from_this());
197                 try
198                 {
199                         socket_->close();
200                 }
201                 catch(...)
202                 {
203                         CASPAR_LOG_CURRENT_EXCEPTION();
204                 }
205                 
206                 CASPAR_LOG(info) << print() << L" Disconnected.";
207         }
208
209         const std::string ipv4_address() const
210         {
211                 return socket_->is_open() ? socket_->remote_endpoint().address().to_string() : "no-address";
212         }
213
214     connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
215                 : socket_(socket)
216                 , service_(socket->get_io_service())
217                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
218                 , connection_set_(connection_set)
219                 , protocol_factory_(protocol_factory)
220                 , is_writing_(false)
221         {
222                 CASPAR_LOG(info) << print() << L" Connected.";
223     }
224
225         protocol_strategy<char>& protocol()     //always called from the asio-service-thread
226         {
227                 if (!protocol_)
228                         protocol_ = protocol_factory_->create(spl::make_shared<connection_holder>(shared_from_this()));
229
230                 return *protocol_;
231         }
232                         
233     void handle_read(const boost::system::error_code& error, size_t bytes_transferred)  //always called from the asio-service-thread
234         {               
235                 if(!error)
236                 {
237                         try
238                         {
239                                 std::string data(data_.begin(), data_.begin() + bytes_transferred);
240
241                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(data);
242
243                                 protocol().parse(data);
244                         }
245                         catch(...)
246                         {
247                                 CASPAR_LOG_CURRENT_EXCEPTION();
248                         }
249                         
250                         read_some();
251                 }  
252                 else if (error != boost::asio::error::operation_aborted)
253                         stop();         
254     }
255
256     void handle_write(const spl::shared_ptr<std::string>& str, const boost::system::error_code& error, size_t bytes_transferred)        //always called from the asio-service-thread
257         {
258                 if(!error)
259                 {
260                         CASPAR_LOG(trace) << print() << L" Sent: " << (str->size() < 512 ? u16(*str) : L"more than 512 bytes.");
261                         if(bytes_transferred != str->size())
262                         {
263                                 str->assign(str->substr(bytes_transferred));
264                                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
265                         }
266                         else
267                         {
268                                 is_writing_ = false;
269                                 do_write();
270                         }
271                 }
272                 else if (error != boost::asio::error::operation_aborted)                
273                         stop();
274     }
275
276         void read_some()        //always called from the asio-service-thread
277         {
278                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
279         }
280         
281         void write_some(std::string&& data)     //always called from the asio-service-thread
282         {
283                 is_writing_ = true;
284                 auto str = spl::make_shared<std::string>(std::move(data));
285                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
286         }
287
288         friend struct AsyncEventServer::implementation;
289 };
290
291 struct AsyncEventServer::implementation
292 {
293         boost::asio::io_service                                 service_;
294         tcp::acceptor                                                   acceptor_;
295         protocol_strategy_factory<char>::ptr    protocol_factory_;
296         spl::shared_ptr<connection_set>                 connection_set_;
297         boost::thread                                                   thread_;
298         std::vector<lifecycle_factory_t>                lifecycle_factories_;
299         tbb::mutex mutex_;
300
301         implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
302                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
303                 , protocol_factory_(protocol)
304                 , thread_([&] { service_.run(); })
305         {
306                 start_accept();
307         }
308
309         ~implementation()
310         {
311                 try
312                 {
313                         acceptor_.close();                      
314                 }
315                 catch(...)
316                 {
317                         CASPAR_LOG_CURRENT_EXCEPTION();
318                 }
319
320                 service_.post([=]
321                 {
322                         auto connections = *connection_set_;
323                         for (auto& connection : connections)
324                                 connection->stop();                             
325                 });
326
327                 thread_.join();
328         }
329                 
330     void start_accept() 
331         {
332                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
333                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
334     }
335
336         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
337         {
338                 if (!acceptor_.is_open())
339                         return;
340                 
341         if (!error)
342                 {
343                         auto conn = connection::create(socket, protocol_factory_, connection_set_);
344                         connection_set_->insert(conn);
345
346                         for (auto& lifecycle_factory : lifecycle_factories_)
347                         {
348                                 auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
349                                 conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second);
350                         }
351                 }
352                 start_accept();
353     }
354
355         void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
356         {
357                 service_.post([=]{ lifecycle_factories_.push_back(factory); });
358         }
359 };
360
361 AsyncEventServer::AsyncEventServer(
362                 const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
363         : impl_(new implementation(protocol, port)) {}
364
365 AsyncEventServer::~AsyncEventServer() {}
366 void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
367
368 }}