]> git.sesse.net Git - casparcg/blob - protocol/osc/client.cpp
promoted no_init_proxy to own file under common
[casparcg] / protocol / osc / client.cpp
1 /*
2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 * Author: Helge Norberg, helge.norberg@svt.se
21 */
22
23 #include "../StdAfx.h"
24
25 #include "client.h"
26
27 #include "oscpack/OscOutboundPacketStream.h"
28 #include "oscpack/OscHostEndianness.h"
29
30 #include <common/utf.h>
31 #include <common/except.h>
32 #include <common/endian.h>
33 #include <common/cache_aligned_vector.h>
34 #include <common/os/general_protection_fault.h>
35 #include <common/no_init_proxy.h>
36
37 #include <core/monitor/monitor.h>
38
39 #include <functional>
40 #include <vector>
41 #include <unordered_map>
42
43 #include <boost/asio.hpp>
44 #include <boost/bind.hpp>
45 #include <boost/thread.hpp>
46
47 #include <tbb/spin_mutex.h>
48
49 using namespace boost::asio::ip;
50
51 namespace caspar { namespace protocol { namespace osc {
52
53 typedef cache_aligned_vector<no_init_proxy<char>> byte_vector;
54
55 template<typename T>
56 struct param_visitor : public boost::static_visitor<void>
57 {
58         T& o;
59
60         param_visitor(T& o)
61                 : o(o)
62         {
63         }               
64                 
65         void operator()(const bool value)                                       {o << value;}
66         void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
67         void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
68         void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
69         void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
70         void operator()(const float value)                                      {o << value;}
71         void operator()(const double value)                                     {o << static_cast<float>(value);}
72         void operator()(const std::string& value)                       {o << value.c_str();}
73         void operator()(const std::wstring& value)                      {o << u8(value).c_str();}
74         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
75 };
76
77 void write_osc_event(byte_vector& destination, const core::monitor::message& message, int retry_allocation_attempt = 0)
78 {
79         static std::size_t max_size = 128;
80
81         destination.resize(max_size);
82
83         ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
84
85         try
86         {
87                 o << ::osc::BeginMessage(message.path().c_str());
88
89                 param_visitor<decltype(o)> param_visitor(o);
90                 for (const auto& data : message.data())
91                         boost::apply_visitor(param_visitor, data);
92
93                 o << ::osc::EndMessage;
94         }
95         catch (const ::osc::OutOfBufferMemoryException& e)
96         {
97                 if (retry_allocation_attempt > message.data().size())
98                         throw;
99
100                 max_size = e.required;
101                 CASPAR_LOG(trace) << L"[osc] Too small buffer for osc message. Increasing to " << max_size;
102                 return write_osc_event(destination, message, retry_allocation_attempt + 1);
103         }
104
105         destination.resize(o.Size());
106 }
107
108 byte_vector write_osc_bundle_start()
109 {
110         byte_vector destination;
111         destination.resize(16);
112
113         ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
114         o << ::osc::BeginBundle();
115
116         destination.resize(o.Size());
117
118         return destination;
119 }
120
121 void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
122 {               
123         destination.resize(4);
124
125         int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
126
127 #ifdef OSC_HOST_LITTLE_ENDIAN
128         *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
129 #else
130         *bundle_element_size = static_cast<int32_t>(bundle.size());
131 #endif
132 }
133
134 struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
135 {
136         std::shared_ptr<boost::asio::io_service>                service_;
137         udp::socket                                                                             socket_;
138         tbb::spin_mutex                                                                 endpoints_mutex_;
139         std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
140
141         std::unordered_map<std::string, byte_vector>    updates_;
142         boost::mutex                                                                    updates_mutex_;                                                         
143         boost::condition_variable                                               updates_cond_;
144
145         tbb::atomic<bool>                                                               is_running_;
146
147         boost::thread                                                                   thread_;
148         
149 public:
150         impl(std::shared_ptr<boost::asio::io_service> service)
151                 : service_(std::move(service))
152                 , socket_(*service_, udp::v4())
153                 , thread_(boost::bind(&impl::run, this))
154         {
155         }
156
157         ~impl()
158         {
159                 is_running_ = false;
160
161                 updates_cond_.notify_one();
162
163                 thread_.join();
164         }
165
166         std::shared_ptr<void> get_subscription_token(
167                         const boost::asio::ip::udp::endpoint& endpoint)
168         {
169                 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
170
171                 ++reference_counts_by_endpoint_[endpoint];
172
173                 std::weak_ptr<impl> weak_self = shared_from_this();
174
175                 return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
176                 {
177                         auto strong = weak_self.lock();
178
179                         if (!strong)
180                                 return;
181
182                         auto& self = *strong;
183
184                         tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
185
186                         int reference_count_after =
187                                 --self.reference_counts_by_endpoint_[endpoint];
188
189                         if (reference_count_after == 0)
190                                 self.reference_counts_by_endpoint_.erase(endpoint);
191                 });
192         }
193 private:
194         void propagate(const core::monitor::message& msg)
195         {
196                 boost::lock_guard<boost::mutex> lock(updates_mutex_);
197
198                 try 
199                 {
200                         write_osc_event(updates_[msg.path()], msg);
201                 }
202                 catch(...)
203                 {
204                         CASPAR_LOG_CURRENT_EXCEPTION();
205                         updates_.erase(msg.path());
206                 }
207
208                 updates_cond_.notify_one();
209         }
210
211         template<typename T>
212         void do_send(
213                         const T& buffers, const std::vector<udp::endpoint>& destinations)
214         {
215                 boost::system::error_code ec;
216
217                 for (const auto& endpoint : destinations)
218                         socket_.send_to(buffers, endpoint, 0, ec);
219         }
220
221         void run()
222         {
223                 // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
224                 const int SAFE_DATAGRAM_SIZE = 508;
225
226                 ensure_gpf_handler_installed_for_thread("osc-sender-thread");
227
228                 try
229                 {
230                         is_running_ = true;
231
232                         std::unordered_map<std::string, byte_vector> updates;
233                         std::vector<udp::endpoint> destinations;
234                         const byte_vector bundle_header = write_osc_bundle_start();
235                         std::vector<byte_vector> element_headers;
236
237                         while (is_running_)
238                         {               
239                                 updates.clear();
240                                 destinations.clear();
241
242                                 {                       
243                                         boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
244
245                                         if (!is_running_)
246                                                 return;
247
248                                         if (updates_.empty())
249                                                 updates_cond_.wait(cond_lock);
250
251                                         std::swap(updates, updates_);
252                                 }
253
254                                 {
255                                         tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
256
257                                         for (const auto& endpoint : reference_counts_by_endpoint_)
258                                                 destinations.push_back(endpoint.first);
259                                 }
260
261                                 if (destinations.empty())
262                                         continue;
263
264                                 std::vector<boost::asio::const_buffers_1> buffers;
265                                 element_headers.resize(
266                                                 std::max(element_headers.size(), updates.size()));
267
268                                 int i = 0;
269                                 auto datagram_size = bundle_header.size();
270                                 buffers.push_back(boost::asio::buffer(bundle_header));
271
272                                 for (const auto& slot : updates)
273                                 {
274                                         write_osc_bundle_element_start(element_headers[i], slot.second);
275                                         const auto& headers = element_headers;
276
277                                         auto size_of_element = headers[i].size() + slot.second.size();
278         
279                                         if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
280                                         {
281                                                 do_send(buffers, destinations);
282                                                 buffers.clear();
283                                                 buffers.push_back(boost::asio::buffer(bundle_header));
284                                                 datagram_size = bundle_header.size();
285                                         }
286
287                                         buffers.push_back(boost::asio::buffer(headers[i]));
288                                         buffers.push_back(boost::asio::buffer(slot.second));
289
290                                         datagram_size += size_of_element;
291                                         ++i;
292                                 }
293                         
294                                 if (!buffers.empty())
295                                         do_send(buffers, destinations);
296                         }
297                 }
298                 catch (...)
299                 {
300                         CASPAR_LOG_CURRENT_EXCEPTION();
301                 }
302         }
303 };
304
305 client::client(std::shared_ptr<boost::asio::io_service> service)
306         : impl_(new impl(std::move(service)))
307 {
308 }
309
310 client::client(client&& other)
311         : impl_(std::move(other.impl_))
312 {
313 }
314
315 client& client::operator=(client&& other)
316 {
317         impl_ = std::move(other.impl_);
318         return *this;
319 }
320
321 client::~client()
322 {
323 }
324
325 std::shared_ptr<void> client::get_subscription_token(
326                         const boost::asio::ip::udp::endpoint& endpoint)
327 {
328         return impl_->get_subscription_token(endpoint);
329 }
330
331 spl::shared_ptr<core::monitor::sink> client::sink()
332 {
333         return impl_;
334 }
335
336 }}}