]> git.sesse.net Git - casparcg/blob - protocol/osc/client.cpp
osc: Cosmetics.
[casparcg] / protocol / osc / client.cpp
1 #include "..\stdafx.h"
2
3 #include "client.h"
4
5 #include "oscpack/oscOutboundPacketStream.h"
6
7 #include <common/utility/string.h>
8
9 #include <boost/asio.hpp>
10 #include <boost/foreach.hpp>
11 #include <boost/thread.hpp>
12
13 #include <tbb/atomic.h>
14 #include <tbb/cache_aligned_allocator.h>
15
16 #include <array>
17 #include <functional>
18 #include <memory>
19 #include <unordered_map>
20 #include <vector>
21
22 using namespace boost::asio::ip;
23
24 namespace caspar { namespace protocol { namespace osc {
25         
26 template<typename T>
27 struct no_init_proxy
28 {
29     T value;
30
31     no_init_proxy() 
32         {
33                 static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
34         static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
35     }
36 };
37
38 typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
39
40 template<typename T>
41 struct param_visitor : public boost::static_visitor<void>
42 {
43         T& o;
44
45         param_visitor(T& o)
46                 : o(o)
47         {
48         }               
49                 
50         void operator()(const bool value)                                       {o << value;}
51         void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
52         void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
53         void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
54         void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
55         void operator()(const float value)                                      {o << value;}
56         void operator()(const double value)                                     {o << static_cast<float>(value);}
57         void operator()(const std::string& value)                       {o << value.c_str();}
58         void operator()(const std::wstring& value)                      {o << narrow(value).c_str();}
59         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
60 };
61
62 struct size_visitor : public boost::static_visitor<std::size_t>
63 {               
64         std::size_t operator()(const bool value)                                        { return sizeof(bool); }
65         std::size_t operator()(const int32_t value)                                     { return sizeof(int64_t); }
66         std::size_t operator()(const uint32_t value)                            { return sizeof(int64_t); }
67         std::size_t operator()(const int64_t value)                                     { return sizeof(int64_t); }
68         std::size_t operator()(const uint64_t value)                            { return sizeof(int64_t); }
69         std::size_t operator()(const float value)                                       { return sizeof(float); }
70         std::size_t operator()(const double value)                                      { return sizeof(float); }
71         std::size_t operator()(const std::string& value)                        { return value.size(); }
72         std::size_t operator()(const std::wstring& value)                       { return value.size(); }
73         std::size_t operator()(const std::vector<int8_t>& value)        { return value.size(); }
74 };
75
76 void write_osc_event(byte_vector& destination, const core::monitor::message& e)
77 {
78         try
79         {
80                 std::size_t size = 256; // This should be enough to cover address, padding and meta-data.
81                 
82                 size_visitor size_visitor;
83                 BOOST_FOREACH(auto& data, e.data())
84                         size += boost::apply_visitor(size_visitor, data);
85
86                 destination.resize(size);
87
88                 ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
89
90                 o << ::osc::BeginMessage(e.path().c_str());
91                                 
92                 param_visitor<decltype(o)> param_visitor(o);
93                 BOOST_FOREACH(auto& data, e.data())
94                         boost::apply_visitor(param_visitor, data);
95                                 
96                 o << ::osc::EndMessage;
97                 
98                 destination.resize(o.Size());
99         }
100         catch(...)
101         {
102         }
103 }
104
105 struct client::impl
106 {
107         boost::asio::io_service                                                 service_;
108                                                                                                         
109         udp::endpoint                                                                   endpoint_;
110         udp::socket                                                                             socket_;        
111         
112         std::unordered_map<std::string, byte_vector>    updates_;
113         boost::mutex                                                                    updates_mutex_;
114                                                                 
115         boost::condition_variable                                               cond_;
116         boost::mutex                                                                    cond_mutex_;
117
118         tbb::atomic<bool>                                                               is_running_;
119
120         boost::thread                                                                   thread_;
121
122         Concurrency::call<core::monitor::message>               on_next_;
123         
124 public:
125         impl(udp::endpoint endpoint, 
126                  Concurrency::ISource<core::monitor::message>& source)
127                 : endpoint_(endpoint)
128                 , socket_(service_, endpoint_.protocol())
129                 , thread_(boost::bind(&impl::run, this))
130                 , on_next_(boost::bind(&impl::on_next, this, _1))
131         {
132                 source.link_target(&on_next_);  
133         }
134
135         ~impl()
136         {               
137                 is_running_ = false;
138
139                 cond_.notify_all();
140
141                 thread_.join();
142         }
143         
144         void on_next(const core::monitor::message& msg)
145         {                               
146                 {
147                         boost::lock_guard<boost::mutex> lock(updates_mutex_);
148                         write_osc_event(updates_[msg.path()], msg);
149                 }
150
151                 cond_.notify_all();
152         }
153
154         void run()
155         {
156                 try
157                 {
158                         is_running_ = true;
159
160                         std::unordered_map<std::string, byte_vector> updates;
161                         
162                         boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
163
164                         while(is_running_)
165                         {                                       
166                                 cond_.wait(cond_lock);
167                                                                                         
168                                 {
169                                         boost::lock_guard<boost::mutex> lock(updates_mutex_);
170                                         std::swap(updates, updates_);
171                                 }
172                                                 
173                                 std::vector<boost::asio::const_buffers_1> buffers;
174
175                                 BOOST_FOREACH(const auto& slot, updates)                
176                                         buffers.push_back(boost::asio::buffer(slot.second));
177                         
178                                 socket_.send_to(buffers, endpoint_);
179                         
180                                 updates.clear();
181                         }
182                 }
183                 catch(...)
184                 {
185                         CASPAR_LOG_CURRENT_EXCEPTION();
186                 }
187         }
188 };
189
190 client::client(udp::endpoint endpoint, 
191                            Concurrency::ISource<core::monitor::message>& source) 
192         : impl_(new impl(endpoint, source))
193 {
194 }
195
196 client::client(client&& other)
197         : impl_(std::move(other.impl_))
198 {
199 }
200
201 client& client::operator=(client&& other)
202 {
203         impl_ = std::move(other.impl_);
204         return *this;
205 }
206
207 client::~client()
208 {
209 }
210
211 }}}