]> git.sesse.net Git - casparcg/blob - protocol/osc/server.cpp
set svn:eol-style native on .h and .cpp files
[casparcg] / protocol / osc / server.cpp
1 #include "..\stdafx.h"
2
3 #include "server.h"
4
5 #include "oscpack/oscOutboundPacketStream.h"
6
7 #include <algorithm>
8 #include <array>
9 #include <string>
10 #include <set>
11 #include <regex>
12 #include <vector>
13
14 #include <boost/optional.hpp>
15 #include <boost/foreach.hpp>
16 #include <boost/asio.hpp>
17 #include <boost/thread.hpp>
18 #include <boost/lexical_cast.hpp>
19 #include <boost/algorithm/string/iter_find.hpp>
20 #include <boost/algorithm/string/finder.hpp>
21 #include <boost/algorithm/string/predicate.hpp>
22 #include <boost/thread/mutex.hpp>
23
24 using namespace boost::asio::ip;
25
26 namespace caspar { namespace protocol { namespace osc {
27         
28 template<typename T>
29 struct param_visitor : public boost::static_visitor<void>
30 {
31         T& o;
32
33         param_visitor(T& o)
34                 : o(o)
35         {
36         }               
37                 
38         void operator()(const bool value)                                       {o << value;}
39         void operator()(const int32_t value)                            {o << value;}
40         void operator()(const uint32_t value)                           {o << value;}
41         void operator()(const int64_t value)                            {o << value;}
42         void operator()(const uint64_t value)                           {o << value;}
43         void operator()(const float value)                                      {o << value;}
44         void operator()(const double value)                                     {o << static_cast<float>(value);}
45         void operator()(const std::string& value)                       {o << value.c_str();}
46         void operator()(const std::wstring& value)                      {o << u8(value).c_str();}
47         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
48         void operator()(const monitor::duration& value)
49         {
50                 o << boost::chrono::duration_cast<boost::chrono::duration<double, boost::ratio<1, 1>>>(value).count();
51         }
52 };
53
54 std::vector<char> write_osc_event(const monitor::event& e)
55 {
56         std::array<char, 4096> buffer;
57         ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
58
59         o       << ::osc::BeginMessage(e.path().str().c_str());
60                                 
61         param_visitor<decltype(o)> pd_visitor(o);
62         BOOST_FOREACH(auto param, e.params())
63                 boost::apply_visitor(pd_visitor, param);
64                                 
65         o       << ::osc::EndMessage;
66                 
67         return std::vector<char>(o.Data(), o.Data() + o.Size());
68 }
69
70 class connection;
71
72 typedef std::set<spl::shared_ptr<connection>> connection_set;
73
74 class connection : public spl::enable_shared_from_this<connection>
75 {    
76     const spl::shared_ptr<tcp::socket>          socket_; 
77         const spl::shared_ptr<connection_set>   connection_set_;
78
79         boost::optional<std::regex>                             regex_;
80         std::array<char, 32768>                                 data_;
81         std::string                                                             input_;
82
83 public:
84     static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, spl::shared_ptr<connection_set> connection_set)
85         {
86                 auto con = spl::shared_ptr<connection>(new connection(std::move(socket), std::move(connection_set)));
87                 con->read_some();
88                 return con;
89     }
90         
91         void stop()
92         {
93                 connection_set_->erase(shared_from_this());
94                 try
95                 {
96                         socket_->close();
97                 }
98                 catch(...)
99                 {
100                         CASPAR_LOG_CURRENT_EXCEPTION();
101                 }
102                 CASPAR_LOG(info) << print() << L" Disconnected.";
103         }
104                 
105         std::wstring print() const
106         {
107                 return L"osc[" + (socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address") + L"]";
108         }
109                 
110         void on_next(const monitor::event& e)
111         {       
112                 if(regex_ && std::regex_search(e.path().str(), *regex_))
113                         return; 
114
115                 auto data_ptr = spl::make_shared<std::vector<char>>(write_osc_event(e));
116                 int32_t size = static_cast<int32_t>(data_ptr->size());
117                 char* size_ptr = reinterpret_cast<char*>(&size);
118
119                 data_ptr->insert(data_ptr->begin(), size_ptr, size_ptr + sizeof(int32_t));
120                 socket_->async_write_some(boost::asio::buffer(*data_ptr), std::bind(&connection::handle_write, shared_from_this(), data_ptr, std::placeholders::_1, std::placeholders::_2));    
121         }
122         
123 private:
124     connection(spl::shared_ptr<tcp::socket> socket, spl::shared_ptr<connection_set> connection_set) 
125                 : socket_(std::move(socket))
126                 , connection_set_(std::move(connection_set))
127         {
128                 CASPAR_LOG(info) << print() << L" Connected.";
129     }
130                                         
131     void handle_read(const boost::system::error_code& error, size_t bytes_transferred) 
132         {               
133                 if(!error)
134                 {
135                         try
136                         {
137                                 on_read(std::string(data_.begin(), data_.begin() + bytes_transferred));
138                         }
139                         catch(...)
140                         {
141                                 CASPAR_LOG_CURRENT_EXCEPTION();
142                         }
143                         
144                         read_some();
145                 }  
146                 else if (error != boost::asio::error::operation_aborted)                
147                         stop();         
148     }
149
150     void handle_write(const spl::shared_ptr<std::vector<char>>& data, const boost::system::error_code& error, size_t bytes_transferred)
151         {
152                 if(!error)                      
153                 {
154                 }
155                 else if (error != boost::asio::error::operation_aborted)
156                         stop();         
157     }
158
159         void read_some()
160         {
161                 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
162         }
163                 
164         void on_read(std::string str)
165         {
166                 input_ += str;
167
168                 std::vector<std::string> split;
169                 boost::iter_split(split, input_, boost::algorithm::first_finder("\r\n"));
170                 
171                 input_ = split.back();
172                 split.pop_back();       
173                 
174                 if(split.empty())
175                         return;
176
177                 if(split.back() == ".*")
178                         regex_.reset();
179                 else
180                         regex_ = std::regex(split.back());
181         }
182 };
183
184 class tcp_observer : public reactive::observer<monitor::event>
185 {
186         boost::asio::io_service                 service_;
187         tcp::acceptor                                   acceptor_;
188         spl::shared_ptr<connection_set> connection_set_;
189         boost::thread                                   thread_;
190         
191 public:
192         tcp_observer(unsigned short port)
193                 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
194                 , thread_(std::bind(&boost::asio::io_service::run, &service_))
195         {
196                 start_accept(); 
197         }
198
199         ~tcp_observer()
200         {               
201                 try
202                 {
203                         acceptor_.close();
204                 }
205                 catch(...)
206                 {
207                         CASPAR_LOG_CURRENT_EXCEPTION();
208                 }
209
210                 service_.post([=]
211                 {
212                         auto connections = *connection_set_;
213                         BOOST_FOREACH(auto& connection, connections)
214                                 connection->stop();                             
215                 });
216
217                 thread_.join();
218         }
219         
220         void on_next(const monitor::event& e) override
221         {
222                 service_.post([=]
223                 {                       
224                         BOOST_FOREACH(auto& connection, *connection_set_)
225                                 connection->on_next(e);
226                 });             
227         }       
228 private:                
229     void start_accept() 
230         {
231                 auto socket = spl::make_shared<tcp::socket>(service_);
232                 acceptor_.async_accept(*socket, std::bind(&tcp_observer::handle_accept, this, socket, std::placeholders::_1));
233     }
234
235         void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error) 
236         {
237                 if (!acceptor_.is_open())
238                         return;
239
240         if (!error)             
241                         connection_set_->insert(connection::create(socket, connection_set_));
242         
243                 start_accept();
244     }
245 };
246
247 server::server(unsigned short port) 
248         : impl_(new tcp_observer(port)){}
249 void server::on_next(const monitor::event& e){impl_->on_next(e);}
250
251 }}}