]> git.sesse.net Git - casparcg/blob - protocol/osc/client.cpp
61e577f85723933f5b1a6ac97fb8426920d8ad23
[casparcg] / protocol / osc / client.cpp
1 /*
2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 * Author: Helge Norberg, helge.norberg@svt.se
21 */
22
23 #include "../stdafx.h"
24
25 #include "client.h"
26
27 #include "oscpack/OscOutboundPacketStream.h"
28 #include "oscpack/OscHostEndianness.h"
29
30 #include <common/utf.h>
31 #include <common/except.h>
32 #include <common/endian.h>
33 #include <common/cache_aligned_vector.h>
34 #include <core/monitor/monitor.h>
35
36 #include <functional>
37 #include <vector>
38 #include <unordered_map>
39
40 #include <boost/asio.hpp>
41 #include <boost/bind.hpp>
42 #include <boost/thread.hpp>
43
44 #include <tbb/spin_mutex.h>
45
46 using namespace boost::asio::ip;
47
48 namespace caspar { namespace protocol { namespace osc {
49
50 template<typename T>
51 struct no_init_proxy
52 {
53     T value;
54
55     no_init_proxy() 
56         {
57                 static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
58         static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
59     }
60 };
61
62 typedef cache_aligned_vector<no_init_proxy<char>> byte_vector;
63
64 template<typename T>
65 struct param_visitor : public boost::static_visitor<void>
66 {
67         T& o;
68
69         param_visitor(T& o)
70                 : o(o)
71         {
72         }               
73                 
74         void operator()(const bool value)                                       {o << value;}
75         void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
76         void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
77         void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
78         void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
79         void operator()(const float value)                                      {o << value;}
80         void operator()(const double value)                                     {o << static_cast<float>(value);}
81         void operator()(const std::string& value)                       {o << value.c_str();}
82         void operator()(const std::wstring& value)                      {o << u8(value).c_str();}
83         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
84 };
85
86 void write_osc_event(byte_vector& destination, const core::monitor::message& e)
87 {               
88         destination.resize(4096);
89
90         ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
91         o << ::osc::BeginMessage(e.path().c_str());
92                                 
93         param_visitor<decltype(o)> param_visitor(o);
94         for (const auto& data : e.data())
95                 boost::apply_visitor(param_visitor, data);
96                                 
97         o << ::osc::EndMessage;
98                 
99         destination.resize(o.Size());
100 }
101
102 byte_vector write_osc_bundle_start()
103 {
104         byte_vector destination;
105         destination.resize(16);
106
107         ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
108         o << ::osc::BeginBundle();
109
110         destination.resize(o.Size());
111
112         return destination;
113 }
114
115 void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
116 {               
117         destination.resize(4);
118
119         int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
120
121 #ifdef OSC_HOST_LITTLE_ENDIAN
122         *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
123 #else
124         *bundle_element_size = static_cast<int32_t>(bundle.size());
125 #endif
126 }
127
128 struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
129 {
130         udp::socket socket_;
131         tbb::spin_mutex                                                                 endpoints_mutex_;
132         std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
133
134         std::unordered_map<std::string, byte_vector>    updates_;
135         boost::mutex                                                                    updates_mutex_;                                                         
136         boost::condition_variable                                               updates_cond_;
137
138         tbb::atomic<bool>                                                               is_running_;
139
140         boost::thread                                                                   thread_;
141         
142 public:
143         impl(boost::asio::io_service& service)
144                 : socket_(service, udp::v4())
145                 , thread_(boost::bind(&impl::run, this))
146         {
147         }
148
149         ~impl()
150         {
151                 is_running_ = false;
152
153                 updates_cond_.notify_one();
154
155                 thread_.join();
156         }
157
158         std::shared_ptr<void> get_subscription_token(
159                         const boost::asio::ip::udp::endpoint& endpoint)
160         {
161                 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
162
163                 ++reference_counts_by_endpoint_[endpoint];
164
165                 std::weak_ptr<impl> weak_self = shared_from_this();
166
167                 return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
168                 {
169                         auto strong = weak_self.lock();
170
171                         if (!strong)
172                                 return;
173
174                         auto& self = *strong;
175
176                         tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
177
178                         int reference_count_after =
179                                 --self.reference_counts_by_endpoint_[endpoint];
180
181                         if (reference_count_after == 0)
182                                 self.reference_counts_by_endpoint_.erase(endpoint);
183                 });
184         }
185 private:
186         void propagate(const core::monitor::message& msg)
187         {
188                 boost::lock_guard<boost::mutex> lock(updates_mutex_);
189
190                 try 
191                 {
192                         write_osc_event(updates_[msg.path()], msg);
193                 }
194                 catch(...)
195                 {
196                         CASPAR_LOG_CURRENT_EXCEPTION();
197                         updates_.erase(msg.path());
198                 }
199
200                 updates_cond_.notify_one();
201         }
202
203         template<typename T>
204         void do_send(
205                         const T& buffers, const std::vector<udp::endpoint>& destinations)
206         {
207                 boost::system::error_code ec;
208
209                 for (const auto& endpoint : destinations)
210                         socket_.send_to(buffers, endpoint, 0, ec);
211         }
212
213         void run()
214         {
215                 // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
216                 const int SAFE_DATAGRAM_SIZE = 508;
217
218                 try
219                 {
220                         is_running_ = true;
221
222                         std::unordered_map<std::string, byte_vector> updates;
223                         std::vector<udp::endpoint> destinations;
224                         const byte_vector bundle_header = write_osc_bundle_start();
225                         std::vector<byte_vector> element_headers;
226
227                         while (is_running_)
228                         {               
229                                 updates.clear();
230                                 destinations.clear();
231
232                                 {                       
233                                         boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
234
235                                         if (updates_.empty())
236                                                 updates_cond_.wait(cond_lock);
237
238                                         std::swap(updates, updates_);
239                                 }
240
241                                 {
242                                         tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
243
244                                         for (const auto& endpoint : reference_counts_by_endpoint_)
245                                                 destinations.push_back(endpoint.first);
246                                 }
247
248                                 if (destinations.empty())
249                                         continue;
250
251                                 std::vector<boost::asio::const_buffers_1> buffers;
252                                 element_headers.resize(
253                                                 std::max(element_headers.size(), updates.size()));
254
255                                 int i = 0;
256                                 auto datagram_size = bundle_header.size();
257                                 buffers.push_back(boost::asio::buffer(bundle_header));
258
259                                 for (const auto& slot : updates)
260                                 {
261                                         write_osc_bundle_element_start(element_headers[i], slot.second);
262                                         const auto& headers = element_headers;
263
264                                         auto size_of_element = headers[i].size() + slot.second.size();
265         
266                                         if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
267                                         {
268                                                 do_send(buffers, destinations);
269                                                 buffers.clear();
270                                                 buffers.push_back(boost::asio::buffer(bundle_header));
271                                                 datagram_size = bundle_header.size();
272                                         }
273
274                                         buffers.push_back(boost::asio::buffer(headers[i]));
275                                         buffers.push_back(boost::asio::buffer(slot.second));
276
277                                         datagram_size += size_of_element;
278                                         ++i;
279                                 }
280                         
281                                 if (!buffers.empty())
282                                         do_send(buffers, destinations);
283                         }
284                 }
285                 catch (...)
286                 {
287                         CASPAR_LOG_CURRENT_EXCEPTION();
288                 }
289         }
290 };
291
292 client::client(boost::asio::io_service& service) 
293         : impl_(new impl(service))
294 {
295 }
296
297 client::client(client&& other)
298         : impl_(std::move(other.impl_))
299 {
300 }
301
302 client& client::operator=(client&& other)
303 {
304         impl_ = std::move(other.impl_);
305         return *this;
306 }
307
308 client::~client()
309 {
310 }
311
312 std::shared_ptr<void> client::get_subscription_token(
313                         const boost::asio::ip::udp::endpoint& endpoint)
314 {
315         return impl_->get_subscription_token(endpoint);
316 }
317
318 spl::shared_ptr<core::monitor::sink> client::sink()
319 {
320         return impl_;
321 }
322
323 }}}