]> git.sesse.net Git - casparcg/blob - protocol/osc/client.cpp
Changed copyright header in all files, and added it in some files where it was missing.
[casparcg] / protocol / osc / client.cpp
1 /*
2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "../stdafx.h"
23
24 #include "client.h"
25
26 #include "oscpack/oscOutboundPacketStream.h"
27
28 #include <common/utility/string.h>
29
30 #include <functional>
31 #include <vector>
32
33 #include <boost/asio.hpp>
34 #include <boost/foreach.hpp>
35 #include <boost/bind.hpp>
36
37 #include <tbb/spin_mutex.h>
38
39 using namespace boost::asio::ip;
40
41 namespace caspar { namespace protocol { namespace osc {
42
43 template<typename T>
44 struct param_visitor : public boost::static_visitor<void>
45 {
46         T& o;
47
48         param_visitor(T& o)
49                 : o(o)
50         {
51         }               
52                 
53         void operator()(const bool value)                                       {o << value;}
54         void operator()(const int32_t value)                            {o << static_cast<int64_t>(value);}
55         void operator()(const uint32_t value)                           {o << static_cast<int64_t>(value);}
56         void operator()(const int64_t value)                            {o << static_cast<int64_t>(value);}
57         void operator()(const uint64_t value)                           {o << static_cast<int64_t>(value);}
58         void operator()(const float value)                                      {o << value;}
59         void operator()(const double value)                                     {o << static_cast<float>(value);}
60         void operator()(const std::string& value)                       {o << value.c_str();}
61         void operator()(const std::wstring& value)                      {o << narrow(value).c_str();}
62         void operator()(const std::vector<int8_t>& value)       {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
63 };
64
65 std::vector<char> write_osc_event(const core::monitor::message& e)
66 {
67         std::array<char, 4096> buffer;
68         ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
69
70         o       << ::osc::BeginMessage(e.path().c_str());
71                                 
72         param_visitor<decltype(o)> pd_visitor(o);
73         BOOST_FOREACH(auto data, e.data())
74                 boost::apply_visitor(pd_visitor, data);
75                                 
76         o       << ::osc::EndMessage;
77                 
78         return std::vector<char>(o.Data(), o.Data() + o.Size());
79 }
80
81 struct client::impl : public std::enable_shared_from_this<client::impl>
82 {
83         tbb::spin_mutex                                                         endpoints_mutex_;
84         std::map<udp::endpoint, int>                            reference_counts_by_endpoint_;
85         udp::socket                                                                     socket_;
86
87         Concurrency::call<core::monitor::message>       on_next_;
88         
89 public:
90         impl(
91                         boost::asio::io_service& service,
92                         Concurrency::ISource<core::monitor::message>& source)
93                 : socket_(service, udp::v4())
94                 , on_next_([this](const core::monitor::message& msg) { on_next(msg); })
95         {
96                 source.link_target(&on_next_);
97         }
98
99         std::shared_ptr<void> get_subscription_token(
100                         const boost::asio::ip::udp::endpoint& endpoint)
101         {
102                 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
103
104                 ++reference_counts_by_endpoint_[endpoint];
105
106                 std::weak_ptr<impl> weak_self = shared_from_this();
107
108                 return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
109                 {
110                         auto strong = weak_self.lock();
111
112                         if (!strong)
113                                 return;
114
115                         auto& self = *strong;
116
117                         tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
118
119                         int reference_count_after =
120                                 --self.reference_counts_by_endpoint_[endpoint];
121
122                         if (reference_count_after == 0)
123                                 self.reference_counts_by_endpoint_.erase(endpoint);
124                 });
125         }
126         
127         void on_next(const core::monitor::message& msg)
128         {
129                 auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
130
131                 tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
132
133                 BOOST_FOREACH(auto& elem, reference_counts_by_endpoint_)
134                 {
135                         auto& endpoint = elem.first;
136
137                         // TODO: We seem to be lucky here, because according to asio
138                         //       documentation only one async operation can be "in flight"
139                         //       at any given point in time for a socket. This somehow seems
140                         //       to work though in the case of UDP and Windows.
141                         socket_.async_send_to(
142                                         boost::asio::buffer(*data_ptr),
143                                         endpoint,
144                                         boost::bind(
145                                                         &impl::handle_send_to,
146                                                         this,
147                                                         data_ptr, // The data_ptr needs to live
148                                                         boost::asio::placeholders::error,
149                                                         boost::asio::placeholders::bytes_transferred));         
150                 }
151         }
152
153         void handle_send_to(
154                         const safe_ptr<std::vector<char>>& /* sent_buffer */,
155                         const boost::system::error_code& /*error*/,
156                         size_t /*bytes_sent*/)
157         {
158         }
159 };
160
161 client::client(
162                 boost::asio::io_service& service,
163                 Concurrency::ISource<core::monitor::message>& source) 
164         : impl_(new impl(service, source))
165 {
166 }
167
168 client::client(client&& other)
169         : impl_(std::move(other.impl_))
170 {
171 }
172
173 client& client::operator=(client&& other)
174 {
175         impl_ = std::move(other.impl_);
176         return *this;
177 }
178
179 client::~client()
180 {
181 }
182
183 std::shared_ptr<void> client::get_subscription_token(
184                         const boost::asio::ip::udp::endpoint& endpoint)
185 {
186         return impl_->get_subscription_token(endpoint);
187 }
188
189 }}}