5 #include "oscpack/oscOutboundPacketStream.h"
14 #include <boost/optional.hpp>
15 #include <boost/foreach.hpp>
16 #include <boost/asio.hpp>
17 #include <boost/thread.hpp>
18 #include <boost/lexical_cast.hpp>
19 #include <boost/algorithm/string/iter_find.hpp>
20 #include <boost/algorithm/string/finder.hpp>
21 #include <boost/algorithm/string/predicate.hpp>
22 #include <boost/thread/mutex.hpp>
24 using namespace boost::asio::ip;
26 namespace caspar { namespace protocol { namespace osc {
29 struct param_visitor : public boost::static_visitor<void>
38 void operator()(const bool value) {o << value;}
39 void operator()(const int32_t value) {o << value;}
40 void operator()(const uint32_t value) {o << value;}
41 void operator()(const int64_t value) {o << value;}
42 void operator()(const uint64_t value) {o << value;}
43 void operator()(const float value) {o << value;}
44 void operator()(const double value) {o << static_cast<float>(value);}
45 void operator()(const std::string& value) {o << value.c_str();}
46 void operator()(const std::wstring& value) {o << u8(value).c_str();}
47 void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
48 void operator()(const monitor::duration& value)
50 o << boost::chrono::duration_cast<boost::chrono::duration<double, boost::ratio<1, 1>>>(value).count();
54 std::vector<char> write_osc_event(const monitor::event& e)
56 std::array<char, 4096> buffer;
57 ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
59 o << ::osc::BeginMessage(e.path().str().c_str());
61 param_visitor<decltype(o)> pd_visitor(o);
62 BOOST_FOREACH(auto param, e.params())
63 boost::apply_visitor(pd_visitor, param);
65 o << ::osc::EndMessage;
67 return std::vector<char>(o.Data(), o.Data() + o.Size());
72 typedef std::set<spl::shared_ptr<connection>> connection_set;
74 class connection : public spl::enable_shared_from_this<connection>
76 const spl::shared_ptr<tcp::socket> socket_;
77 const spl::shared_ptr<connection_set> connection_set_;
79 boost::optional<std::regex> regex_;
80 std::array<char, 32768> data_;
84 static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, spl::shared_ptr<connection_set> connection_set)
86 auto con = spl::shared_ptr<connection>(new connection(std::move(socket), std::move(connection_set)));
93 connection_set_->erase(shared_from_this());
100 CASPAR_LOG_CURRENT_EXCEPTION();
102 CASPAR_LOG(info) << print() << L" Disconnected.";
105 std::wstring print() const
107 return L"osc[" + (socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address") + L"]";
110 void on_next(const monitor::event& e)
112 if(regex_ && std::regex_search(e.path().str(), *regex_))
115 auto data_ptr = spl::make_shared<std::vector<char>>(write_osc_event(e));
116 int32_t size = static_cast<int32_t>(data_ptr->size());
117 char* size_ptr = reinterpret_cast<char*>(&size);
119 data_ptr->insert(data_ptr->begin(), size_ptr, size_ptr + sizeof(int32_t));
120 socket_->async_write_some(boost::asio::buffer(*data_ptr), std::bind(&connection::handle_write, shared_from_this(), data_ptr, std::placeholders::_1, std::placeholders::_2));
124 connection(spl::shared_ptr<tcp::socket> socket, spl::shared_ptr<connection_set> connection_set)
125 : socket_(std::move(socket))
126 , connection_set_(std::move(connection_set))
128 CASPAR_LOG(info) << print() << L" Connected.";
131 void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
137 on_read(std::string(data_.begin(), data_.begin() + bytes_transferred));
141 CASPAR_LOG_CURRENT_EXCEPTION();
146 else if (error != boost::asio::error::operation_aborted)
150 void handle_write(const spl::shared_ptr<std::vector<char>>& data, const boost::system::error_code& error, size_t bytes_transferred)
155 else if (error != boost::asio::error::operation_aborted)
161 socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
164 void on_read(std::string str)
168 std::vector<std::string> split;
169 boost::iter_split(split, input_, boost::algorithm::first_finder("\r\n"));
171 input_ = split.back();
177 if(split.back() == ".*")
180 regex_ = std::regex(split.back());
184 class tcp_observer : public reactive::observer<monitor::event>
186 boost::asio::io_service service_;
187 tcp::acceptor acceptor_;
188 spl::shared_ptr<connection_set> connection_set_;
189 boost::thread thread_;
192 tcp_observer(unsigned short port)
193 : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
194 , thread_(std::bind(&boost::asio::io_service::run, &service_))
207 CASPAR_LOG_CURRENT_EXCEPTION();
212 auto connections = *connection_set_;
213 BOOST_FOREACH(auto& connection, connections)
220 void on_next(const monitor::event& e) override
224 BOOST_FOREACH(auto& connection, *connection_set_)
225 connection->on_next(e);
231 auto socket = spl::make_shared<tcp::socket>(service_);
232 acceptor_.async_accept(*socket, std::bind(&tcp_observer::handle_accept, this, socket, std::placeholders::_1));
235 void handle_accept(const spl::shared_ptr<tcp::socket>& socket, const boost::system::error_code& error)
237 if (!acceptor_.is_open())
241 connection_set_->insert(connection::create(socket, connection_set_));
247 server::server(unsigned short port)
248 : impl_(new tcp_observer(port)){}
249 void server::on_next(const monitor::event& e){impl_->on_next(e);}