]> git.sesse.net Git - casparcg/blob - protocol/util/AsyncEventServer.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / protocol / util / AsyncEventServer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "..\stdafx.h"\r
23 \r
24 #include "AsyncEventServer.h"\r
25 \r
26 #include "ProtocolStrategy.h"\r
27 \r
28 #include <algorithm>\r
29 #include <array>\r
30 #include <string>\r
31 #include <set>\r
32 #include <vector>\r
33 \r
34 #include <boost/asio.hpp>\r
35 #include <boost/thread.hpp>\r
36 #include <boost/lexical_cast.hpp>\r
37 #include <boost/locale.hpp>\r
38 #include <boost/algorithm/string/split.hpp>\r
39 \r
40 using boost::asio::ip::tcp;\r
41 \r
42 namespace caspar { namespace IO {\r
43         \r
44 class connection;\r
45 \r
46 typedef std::set<spl::shared_ptr<connection>> connection_set;\r
47 \r
48 class connection : public spl::enable_shared_from_this<connection>, public ClientInfo\r
49 {    \r
50     spl::shared_ptr<tcp::socket>                                socket_; \r
51         const std::wstring                                                      name_;\r
52 \r
53         std::array<char, 32768>                                         data_;\r
54 \r
55         std::string                                                                     buffer_;\r
56 \r
57         const spl::shared_ptr<IProtocolStrategy>        protocol_;\r
58         spl::shared_ptr<connection_set>                         connection_set_;\r
59 \r
60 public:\r
61     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr<connection_set> connection_set)\r
62         {\r
63                 spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));\r
64                 con->read_some();\r
65                 return con;\r
66     }\r
67 \r
68         std::wstring print() const\r
69         {\r
70                 return L"[" + name_ + L"]";\r
71         }\r
72         \r
73         /* ClientInfo */\r
74         \r
75         virtual void Send(const std::wstring& data)\r
76         {\r
77                 write_some(data);\r
78         }\r
79 \r
80         virtual void Disconnect()\r
81         {\r
82                 stop();\r
83         }\r
84         \r
85         /**************/\r
86         \r
87         void stop()\r
88         {\r
89                 connection_set_->erase(shared_from_this());\r
90                 socket_->close();\r
91                 CASPAR_LOG(info) << print() << L" Disconnected.";\r
92         }\r
93 \r
94 private:\r
95     connection(const spl::shared_ptr<tcp::socket>& socket, const ProtocolStrategyPtr& protocol, const spl::shared_ptr<connection_set>& connection_set) \r
96                 : socket_(socket)\r
97                 , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))\r
98                 , protocol_(protocol)\r
99                 , connection_set_(connection_set)\r
100         {\r
101                 CASPAR_LOG(info) << print() << L" Connected.";\r
102     }\r
103                         \r
104     void handle_read(const boost::system::error_code& error, size_t bytes_transferred) \r
105         {               \r
106                 if(!error)\r
107                 {\r
108                         try\r
109                         {\r
110                                 CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred));\r
111 \r
112                                 buffer_.insert(buffer_.end(), data_.begin(), data_.begin() + bytes_transferred);\r
113                                 \r
114                                 std::vector<std::string> split;\r
115                                 boost::iter_split(split, buffer_, boost::algorithm::first_finder("\r\n"));\r
116                                 \r
117                                 buffer_ = std::move(split.back());\r
118                                 split.pop_back();\r
119 \r
120                                 BOOST_FOREACH(auto cmd, split)\r
121                                 {\r
122                                         auto u16cmd = boost::locale::conv::to_utf<wchar_t>(cmd, protocol_->GetCodepage()) + L"\r\n";\r
123                                         protocol_->Parse(u16cmd.data(), static_cast<int>(u16cmd.size()), shared_from_this());\r
124                                 }\r
125                         }\r
126                         catch(...)\r
127                         {\r
128                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
129                         }\r
130                         \r
131                         read_some();\r
132                 }  \r
133                 else if (error != boost::asio::error::operation_aborted)\r
134                         stop();         \r
135                 else\r
136                         read_some();\r
137     }\r
138 \r
139     void handle_write(const spl::shared_ptr<std::string>& data, const boost::system::error_code& error, size_t bytes_transferred)\r
140         {\r
141                 if(!error)                      \r
142                         CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(std::string(data->begin(), data->end())) : L"more than 512 bytes.");            \r
143                 else if (error != boost::asio::error::operation_aborted)                \r
144                         stop();         \r
145     }\r
146 \r
147         void read_some()\r
148         {\r
149                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));\r
150         }\r
151         \r
152         void write_some(const std::wstring& data)\r
153         {\r
154                 auto str = spl::make_shared<std::string>(boost::locale::conv::from_utf<wchar_t>(data, protocol_->GetCodepage()));\r
155                 socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));\r
156         }\r
157 };\r
158 \r
159 struct AsyncEventServer::implementation\r
160 {\r
161         boost::asio::io_service                                 service_;\r
162         tcp::acceptor                                                   acceptor_;\r
163         spl::shared_ptr<IProtocolStrategy>              protocol_;\r
164         spl::shared_ptr<connection_set>                 connection_set_;\r
165         boost::thread                                                   thread_;\r
166 \r
167         implementation(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port)\r
168                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))\r
169                 , protocol_(protocol)\r
170                 , thread_(std::bind(&boost::asio::io_service::run, &service_))\r
171         {\r
172                 start_accept();\r
173         }\r
174 \r
175         ~implementation()\r
176         {\r
177                 try\r
178                 {\r
179                         acceptor_.close();\r
180 \r
181                         service_.post([=]\r
182                         {\r
183                                 auto connections = *connection_set_;\r
184                                 BOOST_FOREACH(auto& connection, connections)\r
185                                 {\r
186                                         try\r
187                                         {\r
188                                                 connection->stop();\r
189                                         }\r
190                                         catch(...)\r
191                                         {\r
192                                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
193                                         }\r
194                                 }\r
195                         });\r
196                 }\r
197                 catch(...)\r
198                 {\r
199                         CASPAR_LOG_CURRENT_EXCEPTION();\r
200                 }\r
201 \r
202                 thread_.join();\r
203         }\r
204                 \r
205     void start_accept() \r
206         {\r
207                 spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));\r
208                 acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));\r
209     }\r
210 \r
211         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) \r
212         {\r
213                 if (!acceptor_.is_open())\r
214                         return;\r
215                 \r
216         if (!error)             \r
217                         connection_set_->insert(connection::create(socket, protocol_, connection_set_));\r
218 \r
219                 start_accept();\r
220     }\r
221 };\r
222 \r
223 AsyncEventServer::AsyncEventServer(const spl::shared_ptr<IProtocolStrategy>& protocol, unsigned short port) : impl_(new implementation(protocol, port)){}\r
224 AsyncEventServer::~AsyncEventServer(){}\r
225 }}