]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
2.1.0: protocol: Refactored. Uses boost::locale.
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "..\stdafx.h"\r
23 \r
24 #include "AsyncEventServer.h"\r
25 \r
26 #include "ProtocolStrategy.h"\r
27 \r
28 #include <algorithm>\r
29 #include <array>\r
30 #include <string>\r
31 #include <set>\r
32 #include <vector>\r
33 \r
34 #include <boost/asio.hpp>\r
35 #include <boost/thread.hpp>\r
36 #include <boost/lexical_cast.hpp>\r
37 #include <boost/locale.hpp>\r
38 #include <boost/algorithm/string/split.hpp>\r
39 \r
40 using boost::asio::ip::tcp;\r
41 \r
42 namespace caspar { namespace IO {\r
43         \r
44 class connection;\r
45 \r
46 typedef std::set<spl::shared_ptr<connection>> connection_set;\r
47 \r
48 class connection : public spl::enable_shared_from_this<connection>, public ClientInfo\r
49 {    \r
50     spl::shared_ptr<tcp::socket>                                socket_; \r
51         const std::wstring                                                      name_;\r
52 \r
53         std::array<char, 32768>                                         data_;\r
54 \r
55         std::string                                                                     buffer_;\r
56 \r
57         const spl::shared_ptr<IProtocolStrategy>        protocol_;\r
58         spl::shared_ptr<connection_set>                         connection_set_;\r
59 \r
60 public:\r
61     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr<connection_set> connection_set)\r
62         {\r
63                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));\r
64                 con->read_some();\r
65                 return con;\r
66     }\r
67 \r
68         std::wstring print() const\r
69         {\r
70                 return L"[" + name_ + L"]";\r
71         }\r
72         \r
73         /* ClientInfo */\r
74         \r
75         virtual void Send(const std::wstring& data)\r
76         {\r
77                 write_some(data);\r
78         }\r
79 \r
80         virtual void Disconnect()\r
81         {\r
82                 stop();\r
83         }\r
84         \r
85         /**************/\r
86         \r
87         void stop()\r
88         {\r
89                 connection_set_->erase(shared_from_this());\r
90                 socket_->close();\r
91                 CASPAR_LOG(info) << print() << L" Disconnected.";\r
92         }\r
93 \r
94 private:\r
95     connection(const spl::shared_ptr<tcp::socket>& socket, const ProtocolStrategyPtr& protocol, const spl::shared_ptr<connection_set>& connection_set) \r
96                 : socket_(socket)\r
97                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))\r
98                 , protocol_(protocol)\r
99                 , connection_set_(connection_set)\r
100         {\r
101                 CASPAR_LOG(info) << print() << L" Connected.";\r
102     }\r
103                         \r
104     void handle_read(const boost::system::error_code& error, size_t bytes_transferred) \r
105         {               \r
106                 if(!error)\r
107                 {\r
108                         try\r
109                         {\r
110                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred));\r
111 \r
112                                 buffer_.insert(buffer_.end(), data_.begin(), data_.begin() + bytes_transferred);\r
113                                 \r
114                                 std::vector<std::string> split;\r
115                                 boost::iter_split(split, buffer_, boost::algorithm::first_finder("\r\n"));\r
116                                 \r
117                                 buffer_ = std::move(split.back());\r
118                                 split.pop_back();\r
119 \r
120                                 BOOST_FOREACH(auto cmd, split)\r
121                                 {\r
122                                         auto u16cmd = boost::locale::conv::to_utf<wchar_t>(cmd, protocol_->GetCodepage()) + L"\r\n";\r
123                                         protocol_->Parse(u16cmd.data(), static_cast<int>(u16cmd.size()), shared_from_this());\r
124                                 }\r
125                         }\r
126                         catch(...)\r
127                         {\r
128                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
129                         }\r
130                         \r
131                         read_some();\r
132                 }  \r
133                 else if (error != boost::asio::error::operation_aborted)\r
134                         stop();         \r
135                 else\r
136                         read_some();\r
137     }\r
138 \r
139     void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)\r
140         {\r
141                 if(!error)                      \r
142                 {\r
143                         CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(std::string(data->begin(), data->end())) : L"more than 512 bytes.");\r
144                 }\r
145                 else if (error != boost::asio::error::operation_aborted)                \r
146                         stop();         \r
147     }\r
148 \r
149         void read_some()\r
150         {\r
151                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));\r
152         }\r
153         \r
154         void write_some(const std::wstring& data)\r
155         {\r
156                 auto str = spl::make_shared<std::string>(boost::locale::conv::from_utf<wchar_t>(data, protocol_->GetCodepage()));\r
157                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));\r
158         }\r
159 };\r
160 \r
161 struct AsyncEventServer::implementation\r
162 {\r
163         boost::asio::io_service                                 service_;\r
164         tcp::acceptor                                                   acceptor_;\r
165         spl::shared_ptr<IProtocolStrategy>              protocol_;\r
166         spl::shared_ptr<connection_set>                 connection_set_;\r
167         boost::thread                                                   thread_;\r
168 \r
169         implementation(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port)\r
170                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))\r
171                 , protocol_(protocol)\r
172                 , thread_(std::bind(&boost::asio::io_service::run, &service_))\r
173         {\r
174                 start_accept();\r
175         }\r
176 \r
177         ~implementation()\r
178         {\r
179                 try\r
180                 {\r
181                         acceptor_.close();\r
182 \r
183                         service_.post([=]\r
184                         {\r
185                                 auto connections = *connection_set_;\r
186                                 BOOST_FOREACH(auto& connection, connections)\r
187                                 {\r
188                                         try\r
189                                         {\r
190                                                 connection->stop();\r
191                                         }\r
192                                         catch(...)\r
193                                         {\r
194                                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
195                                         }\r
196                                 }\r
197                         });\r
198                 }\r
199                 catch(...)\r
200                 {\r
201                         CASPAR_LOG_CURRENT_EXCEPTION();\r
202                 }\r
203 \r
204                 thread_.join();\r
205         }\r
206                 \r
207     void start_accept() \r
208         {\r
209                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));\r
210                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));\r
211     }\r
212 \r
213         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) \r
214         {\r
215                 if (!acceptor_.is_open())\r
216                         return;\r
217                 \r
218         if (!error)             \r
219                         connection_set_->insert(connection::create(socket, protocol_, connection_set_));\r
220 \r
221                 start_accept();\r
222     }\r
223 };\r
224 \r
225 AsyncEventServer::AsyncEventServer(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port) : impl_(new implementation(protocol, port)){}\r
226 AsyncEventServer::~AsyncEventServer(){}\r
227 }}