]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
set svn:eol-style native on .h and .cpp files
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "..\stdafx.h"
23
24 #include "AsyncEventServer.h"
25
26 #include "ProtocolStrategy.h"
27
28 #include <algorithm>
29 #include <array>
30 #include <string>
31 #include <set>
32 #include <vector>
33
34 #include <boost/asio.hpp>
35 #include <boost/thread.hpp>
36 #include <boost/lexical_cast.hpp>
37 #include <boost/locale.hpp>
38 #include <boost/algorithm/string/split.hpp>
39
40 using boost::asio::ip::tcp;
41
42 namespace caspar { namespace IO {
43         
44 class connection;
45
46 typedef std::set<spl::shared_ptr<connection>> connection_set;
47
48 class connection : public spl::enable_shared_from_this<connection>, public ClientInfo
49 {    
50     const spl::shared_ptr<tcp::socket>                  socket_; 
51         const spl::shared_ptr<connection_set>           connection_set_;
52         const std::wstring                                                      name_;
53         const spl::shared_ptr<IProtocolStrategy>        protocol_;
54
55         std::array<char, 32768>                                         data_;
56         std::string                                                                     input_;
57
58 public:
59     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr<connection_set> connection_set)
60         {
61                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
62                 con->read_some();
63                 return con;
64     }
65
66         std::wstring print() const
67         {
68                 return L"[" + name_ + L"]";
69         }
70         
71         /* ClientInfo */
72         
73         virtual void Send(const std::wstring& data)
74         {
75                 write_some(data);
76         }
77
78         virtual void Disconnect()
79         {
80                 stop();
81         }
82         
83         /**************/
84         
85         void stop()
86         {
87                 connection_set_->erase(shared_from_this());
88                 try
89                 {
90                         socket_->close();
91                 }
92                 catch(...)
93                 {
94                         CASPAR_LOG_CURRENT_EXCEPTION();
95                 }
96                 CASPAR_LOG(info) << print() << L" Disconnected.";
97         }
98
99 private:
100     connection(const spl::shared_ptr<tcp::socket>& socket, const ProtocolStrategyPtr& protocol, const spl::shared_ptr<connection_set>& connection_set) 
101                 : socket_(socket)
102                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
103                 , connection_set_(connection_set)
104                 , protocol_(protocol)
105         {
106                 CASPAR_LOG(info) << print() << L" Connected.";
107     }
108                         
109     void handle_read(const boost::system::error_code& error, size_t bytes_transferred) 
110         {               
111                 if(!error)
112                 {
113                         try
114                         {
115                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred));
116
117                                 input_.append(data_.begin(), data_.begin() + bytes_transferred);
118                                 
119                                 std::vector<std::string> split;
120                                 boost::iter_split(split, input_, boost::algorithm::first_finder("\r\n"));
121                                 
122                                 input_ = std::move(split.back());
123                                 split.pop_back();
124
125                                 BOOST_FOREACH(auto cmd, split)
126                                 {
127                                         auto u16cmd = boost::locale::conv::to_utf<wchar_t>(cmd, protocol_->GetCodepage()) + L"\r\n";
128                                         protocol_->Parse(u16cmd.data(), static_cast<int>(u16cmd.size()), shared_from_this());
129                                 }
130                         }
131                         catch(...)
132                         {
133                                 CASPAR_LOG_CURRENT_EXCEPTION();
134                         }
135                         
136                         read_some();
137                 }  
138                 else if (error != boost::asio::error::operation_aborted)
139                         stop();         
140     }
141
142     void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)
143         {
144                 if(!error)                      
145                         CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");              
146                 else if (error != boost::asio::error::operation_aborted)                
147                         stop();         
148     }
149
150         void read_some()
151         {
152                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
153         }
154         
155         void write_some(const std::wstring& data)
156         {
157                 auto str = spl::make_shared<std::string>(boost::locale::conv::from_utf<wchar_t>(data, protocol_->GetCodepage()));
158                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
159         }
160 };
161
162 struct AsyncEventServer::implementation
163 {
164         boost::asio::io_service                                 service_;
165         tcp::acceptor                                                   acceptor_;
166         spl::shared_ptr<IProtocolStrategy>              protocol_;
167         spl::shared_ptr<connection_set>                 connection_set_;
168         boost::thread                                                   thread_;
169
170         implementation(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port)
171                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
172                 , protocol_(protocol)
173                 , thread_(std::bind(&boost::asio::io_service::run, &service_))
174         {
175                 start_accept();
176         }
177
178         ~implementation()
179         {
180                 try
181                 {
182                         acceptor_.close();                      
183                 }
184                 catch(...)
185                 {
186                         CASPAR_LOG_CURRENT_EXCEPTION();
187                 }
188
189                 service_.post([=]
190                 {
191                         auto connections = *connection_set_;
192                         BOOST_FOREACH(auto& connection, connections)
193                                 connection->stop();                             
194                 });
195
196                 thread_.join();
197         }
198                 
199     void start_accept() 
200         {
201                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
202                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
203     }
204
205         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
206         {
207                 if (!acceptor_.is_open())
208                         return;
209                 
210         if (!error)             
211                         connection_set_->insert(connection::create(socket, protocol_, connection_set_));
212
213                 start_accept();
214     }
215 };
216
217 AsyncEventServer::AsyncEventServer(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port) : impl_(new implementation(protocol, port)){}
218 AsyncEventServer::~AsyncEventServer(){}
219 }}