]> git.sesse.net Git - casparcg/blob - protocol/osc/client.cpp
Merge branch 'master' of https://github.com/CasparCG/Server
[casparcg] / protocol / osc / client.cpp
1 /*
2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 * Author: Helge Norberg, helge.norberg@svt.se
21 */
22
23 #include "../stdafx.h"
24
25 #include "client.h"
26
27 #include "oscpack/OscOutboundPacketStream.h"
28 #include "oscpack/OscHostEndianness.h"
29
30 #include <common/utility/string.h>
31 #include <common/exception/win32_exception.h>
32 #include <common/memory/endian.h>
33
34 #include <core/monitor/monitor.h>
35
36 #include <functional>
37 #include <vector>
38 #include <unordered_map>
39
40 #include <boost/asio.hpp>
41 #include <boost/foreach.hpp>
42 #include <boost/bind.hpp>
43 #include <boost/thread.hpp>
44
45 #include <tbb/spin_mutex.h>
46 #include <tbb/cache_aligned_allocator.h>
47
48 using namespace boost::asio::ip;
49
50 namespace caspar { namespace protocol { namespace osc {
51
52 template<typename T>
53 struct no_init_proxy
54 {
55     T value;
56
57     no_init_proxy() 
58         {
59                 static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
60         static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
61     }
62 };
63
64 typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
65
66 template<typename T>
67 struct param_visitor : public boost::static_visitor<void>
68 {
69         T& o;
70
71         param_visitor(T& o)
72                 : o(o)
73         {
74         }               
75                 
76         void operator()(const bool value)                                       {o << value;}
77         void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
78         void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
79         void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
80         void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
81         void operator()(const float value)                                      {o << value;}
82         void operator()(const double value)                                     {o << static_cast<float>(value);}
83         void operator()(const std::string& value)                       {o << value.c_str();}
84         void operator()(const std::wstring& value)                      {o << narrow(value).c_str();}
85         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
86 };
87
88 void write_osc_event(byte_vector& destination, const core::monitor::message& e)
89 {               
90         destination.resize(4096);
91
92         ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
93         o << ::osc::BeginMessage(e.path().c_str());
94                                 
95         param_visitor<decltype(o)> param_visitor(o);
96         BOOST_FOREACH(const auto& data, e.data())
97                 boost::apply_visitor(param_visitor, data);
98                                 
99         o << ::osc::EndMessage;
100                 
101         destination.resize(o.Size());
102 }
103
104 byte_vector write_osc_bundle_start()
105 {
106         byte_vector destination;
107         destination.resize(16);
108
109         ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
110         o << ::osc::BeginBundle();
111
112         destination.resize(o.Size());
113
114         return destination;
115 }
116
117 void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
118 {               
119         destination.resize(4);
120
121         int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
122
123 #ifdef OSC_HOST_LITTLE_ENDIAN
124         *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
125 #else
126         *bundle_element_size = static_cast<int32_t>(bundle.size());
127 #endif
128 }
129
130 struct client::impl : public std::enable_shared_from_this<client::impl>, core::monitor::sink
131 {
132         std::shared_ptr<boost::asio::io_service>                service_;
133         udp::socket socket_;
134         tbb::spin_mutex                                                                 endpoints_mutex_;
135         std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
136
137         std::unordered_map<std::string, byte_vector>    updates_;
138         boost::mutex                                                                    updates_mutex_;                                                         
139         boost::condition_variable                                               updates_cond_;
140
141         tbb::atomic<bool>                                                               is_running_;
142
143         boost::thread                                                                   thread_;
144         
145 public:
146         impl(std::shared_ptr<boost::asio::io_service> service)
147                 : service_(std::move(service))
148                 , socket_(*service_, udp::v4())
149                 , thread_(boost::bind(&impl::run, this))
150         {
151         }
152
153         ~impl()
154         {
155                 is_running_ = false;
156
157                 updates_cond_.notify_one();
158
159                 thread_.join();
160         }
161
162         std::shared_ptr<void> get_subscription_token(
163                         const boost::asio::ip::udp::endpoint& endpoint)
164         {
165                 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
166
167                 ++reference_counts_by_endpoint_[endpoint];
168
169                 std::weak_ptr<impl> weak_self = shared_from_this();
170
171                 return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
172                 {
173                         auto strong = weak_self.lock();
174
175                         if (!strong)
176                                 return;
177
178                         auto& self = *strong;
179
180                         tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
181
182                         int reference_count_after =
183                                 --self.reference_counts_by_endpoint_[endpoint];
184
185                         if (reference_count_after == 0)
186                                 self.reference_counts_by_endpoint_.erase(endpoint);
187                 });
188         }
189 private:
190         void propagate(const core::monitor::message& msg)
191         {
192                 boost::lock_guard<boost::mutex> lock(updates_mutex_);
193
194                 try 
195                 {
196                         write_osc_event(updates_[msg.path()], msg);
197                 }
198                 catch(...)
199                 {
200                         CASPAR_LOG_CURRENT_EXCEPTION();
201                         updates_.erase(msg.path());
202                 }
203
204                 updates_cond_.notify_one();
205         }
206
207         template<typename T>
208         void do_send(
209                         const T& buffers, const std::vector<udp::endpoint>& destinations)
210         {
211                 boost::system::error_code ec;
212
213                 BOOST_FOREACH(const auto& endpoint, destinations)
214                         socket_.send_to(buffers, endpoint, 0, ec);
215         }
216
217         void run()
218         {
219                 // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
220                 const int SAFE_DATAGRAM_SIZE = 508;
221
222                 try
223                 {
224                         is_running_ = true;
225
226                         std::unordered_map<std::string, byte_vector> updates;
227                         std::vector<udp::endpoint> destinations;
228                         const byte_vector bundle_header = write_osc_bundle_start();
229                         std::vector<byte_vector> element_headers;
230
231                         while (is_running_)
232                         {               
233                                 updates.clear();
234                                 destinations.clear();
235
236                                 {                       
237                                         boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
238
239                                         if (updates_.empty())
240                                                 updates_cond_.wait(cond_lock);
241
242                                         std::swap(updates, updates_);
243                                 }
244
245                                 {
246                                         tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
247
248                                         BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
249                                                 destinations.push_back(endpoint.first);
250                                 }
251
252                                 if (destinations.empty())
253                                         continue;
254
255                                 std::vector<boost::asio::const_buffers_1> buffers;
256                                 element_headers.resize(
257                                                 std::max(element_headers.size(), updates.size()));
258
259                                 int i = 0;
260                                 int datagram_size = bundle_header.size();
261                                 buffers.push_back(boost::asio::buffer(bundle_header));
262
263                                 BOOST_FOREACH(const auto& slot, updates)
264                                 {
265                                         write_osc_bundle_element_start(element_headers[i], slot.second);
266                                         const auto& headers = element_headers;
267
268                                         auto size_of_element = headers[i].size() + slot.second.size();
269         
270                                         if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
271                                         {
272                                                 do_send(buffers, destinations);
273                                                 buffers.clear();
274                                                 buffers.push_back(boost::asio::buffer(bundle_header));
275                                                 datagram_size = bundle_header.size();
276                                         }
277
278                                         buffers.push_back(boost::asio::buffer(headers[i]));
279                                         buffers.push_back(boost::asio::buffer(slot.second));
280
281                                         datagram_size += size_of_element;
282                                         ++i;
283                                 }
284                         
285                                 if (!buffers.empty())
286                                         do_send(buffers, destinations);
287                         }
288                 }
289                 catch (...)
290                 {
291                         CASPAR_LOG_CURRENT_EXCEPTION();
292                 }
293         }
294 };
295
296 client::client(std::shared_ptr<boost::asio::io_service> service) 
297         : impl_(new impl(std::move(service)))
298 {
299 }
300
301 client::client(client&& other)
302         : impl_(std::move(other.impl_))
303 {
304 }
305
306 client& client::operator=(client&& other)
307 {
308         impl_ = std::move(other.impl_);
309         return *this;
310 }
311
312 client::~client()
313 {
314 }
315
316 std::shared_ptr<void> client::get_subscription_token(
317                         const boost::asio::ip::udp::endpoint& endpoint)
318 {
319         return impl_->get_subscription_token(endpoint);
320 }
321
322 safe_ptr<core::monitor::sink> client::sink()
323 {
324         return impl_;
325 }
326
327 }}}