]> git.sesse.net Git - casparcg/blob - protocol/osc/server.cpp
- Implemented real-time state notification using OSC-UDP.
[casparcg] / protocol / osc / server.cpp
1 #include "..\stdafx.h"
2
3 #include "server.h"
4
5 #include "oscpack/oscOutboundPacketStream.h"
6
7 #include <common/utility/string.h>
8
9 #include <functional>
10 #include <vector>
11
12 #include <boost/asio.hpp>
13 #include <boost/foreach.hpp>
14 #include <boost/thread.hpp>
15
16 using namespace boost::asio::ip;
17
18 namespace caspar { namespace protocol { namespace osc {
19         
20 template<typename T>
21 struct param_visitor : public boost::static_visitor<void>
22 {
23         T& o;
24
25         param_visitor(T& o)
26                 : o(o)
27         {
28         }               
29                 
30         void operator()(const bool value)                                       {o << value;}
31         void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
32         void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
33         void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
34         void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
35         void operator()(const float value)                                      {o << value;}
36         void operator()(const double value)                                     {o << static_cast<float>(value);}
37         void operator()(const std::string& value)                       {o << value.c_str();}
38         void operator()(const std::wstring& value)                      {o << narrow(value).c_str();}
39         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
40 };
41
42 std::vector<char> write_osc_event(const core::monitor::message& e)
43 {
44         std::array<char, 4096> buffer;
45         ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
46
47         o       << ::osc::BeginMessage(e.path().c_str());
48                                 
49         param_visitor<decltype(o)> pd_visitor(o);
50         BOOST_FOREACH(auto data, e.data())
51                 boost::apply_visitor(pd_visitor, data);
52                                 
53         o       << ::osc::EndMessage;
54                 
55         return std::vector<char>(o.Data(), o.Data() + o.Size());
56 }
57
58 struct server::impl
59 {
60         boost::asio::io_service                                         service_;
61
62         udp::endpoint                                                           endpoint_;
63         udp::socket                                                                     socket_;        
64
65         boost::thread                                                           thread_;
66
67         Concurrency::call<core::monitor::message>       on_next_;
68         
69 public:
70         impl(udp::endpoint endpoint, 
71                  Concurrency::ISource<core::monitor::message>& source)
72                 : endpoint_(endpoint)
73                 , socket_(service_, endpoint_.protocol())
74                 , thread_(std::bind(&boost::asio::io_service::run, &service_))
75                 , on_next_([this](const core::monitor::message& msg){ on_next(msg); })
76         {
77                 source.link_target(&on_next_);
78         }
79
80         ~impl()
81         {               
82                 thread_.join();
83         }
84         
85         void on_next(const core::monitor::message& msg)
86         {
87                 auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
88
89                 socket_.async_send_to(boost::asio::buffer(*data_ptr), 
90                                                           endpoint_,
91                                                           boost::bind(&impl::handle_send_to, this,
92                                                           boost::asio::placeholders::error,
93                                                           boost::asio::placeholders::bytes_transferred));               
94         }       
95
96         void handle_send_to(const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
97         {
98         }
99 };
100
101 server::server(udp::endpoint endpoint, 
102                            Concurrency::ISource<core::monitor::message>& source) 
103         : impl_(new impl(endpoint, source))
104 {
105 }
106
107 server::server(server&& other)
108         : impl_(std::move(other.impl_))
109 {
110 }
111
112 server& server::operator=(server&& other)
113 {
114         impl_ = std::move(other.impl_);
115         return *this;
116 }
117
118 server::~server()
119 {
120 }
121
122 }}}