Coding style consistency fix.
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stddef.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <time.h>
8 #include <unistd.h>
9 #include <string>
10
11 #include "acceptor.h"
12 #include "log.h"
13 #include "mutexlock.h"
14 #include "serverpool.h"
15 #include "state.pb.h"
16 #include "stream.h"
17 #include "udpinput.h"
18 #include "util.h"
19 #include "version.h"
20
21 using namespace std;
22
23 extern ServerPool *servers;
24
25 namespace {
26
27 // Similar to parse_hostport(), but only parses the IP address,
28 // and does not use mapped-v4 addresses, since multicast seems
29 // to not like that too much.
30 bool parse_ip_address(const string &ip, sockaddr_storage *addr)
31 {
32         memset(addr, 0, sizeof(*addr));
33
34         assert(!ip.empty());
35         if (ip[0] == '[') {
36                 sockaddr_in6 *addr6 = (sockaddr_in6 *)addr;
37                 addr6->sin6_family = AF_INET6;
38                 if (ip[ip.size() - 1] != ']') {
39                         log(ERROR, "address '%s' is malformed; must be either [ipv6addr] or ipv4addr",
40                                 ip.c_str());
41                         return false;
42                 }
43                 if (inet_pton(AF_INET6, ip.c_str(), &addr6->sin6_addr) != 1) {
44                         log(ERROR, "'%s' is not a valid IPv6 address");
45                         return false;
46                 }
47         } else {
48                 sockaddr_in *addr4 = (sockaddr_in *)addr;
49                 addr4->sin_family = AF_INET;
50                 if (inet_pton(AF_INET, ip.c_str(), &addr4->sin_addr) != 1) {
51                         log(ERROR, "'%s' is not a valid IPv4 address");
52                         return false;
53                 }
54         }
55
56         return true;
57 }
58
59 bool maybe_join_multicast_group(int sock, const string &group, const string &source)
60 {
61         if (group.empty()) {
62                 // Not multicast.
63                 return true;
64         }
65
66         // Join the given multicast group (ASM or SSM).
67         // TODO: Also support sources apart from multicast groups,
68         // e.g. udp://[::1]:1234 for only receiving from localhost.
69         if (!source.empty()) {
70                 // Single-Source Multicast (SSM).
71                 group_source_req gsr;
72                 memset(&gsr, 0, sizeof(gsr));
73                 if (!parse_ip_address(group, &gsr.gsr_group)) {
74                         return false;
75                 }
76                 if (!parse_ip_address(source, &gsr.gsr_source)) {
77                         return false;
78                 }
79                 int level = (gsr.gsr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6;
80                 if (setsockopt(sock, level, MCAST_JOIN_SOURCE_GROUP, &gsr, sizeof(gsr)) == -1) {
81                         log_perror("setsockopt(MCAST_JOIN_SOURCE_GROUP)");
82                         return false;
83                 }
84         } else {
85                 // Any-Source Multicast (ASM).
86                 group_req gr;
87                 memset(&gr, 0, sizeof(gr));
88                 if (!parse_ip_address(group, &gr.gr_group)) {
89                         return false;
90                 }
91                 int level = (gr.gr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6;
92                 if (setsockopt(sock, level, MCAST_JOIN_GROUP, &gr, sizeof(gr)) == -1) {
93                         log_perror("setsockopt(MCAST_JOIN_GROUP)");
94                         return false;
95                 }
96         }
97
98         return true;
99 }
100
101 }  // namespace
102
103 UDPInput::UDPInput(const string &url)
104         : url(url),
105           sock(-1)
106 {
107         // Should be verified by the caller.
108         string protocol;
109         bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
110         assert(ok);
111
112         construct_header();
113
114         pthread_mutex_init(&stats_mutex, NULL);
115         stats.url = url;
116         stats.bytes_received = 0;
117         stats.data_bytes_received = 0;
118         stats.connect_time = time(NULL);
119 }
120
121 UDPInput::UDPInput(const InputProto &serialized)
122         : url(serialized.url()),
123           sock(serialized.sock())
124 {
125         // Should be verified by the caller.
126         string protocol;
127         bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
128         assert(ok);
129
130         construct_header();
131
132         pthread_mutex_init(&stats_mutex, NULL);
133         stats.url = url;
134         stats.bytes_received = serialized.bytes_received();
135         stats.data_bytes_received = serialized.data_bytes_received();
136         if (serialized.has_connect_time()) {
137                 stats.connect_time = serialized.connect_time();
138         } else {
139                 stats.connect_time = time(NULL);
140         }
141 }
142
143 InputProto UDPInput::serialize() const
144 {
145         InputProto serialized;
146         serialized.set_url(url);
147         serialized.set_sock(sock);
148         serialized.set_bytes_received(stats.bytes_received);
149         serialized.set_data_bytes_received(stats.data_bytes_received);
150         serialized.set_connect_time(stats.connect_time);
151         return serialized;
152 }
153
154 void UDPInput::close_socket()
155 {
156         safe_close(sock);
157         sock = -1;
158 }
159         
160 void UDPInput::construct_header()
161 {
162         http_header =
163                 "HTTP/1.0 200 OK\r\n"
164                 "Content-type: application/octet-stream\r\n"
165                 "Cache-control: no-cache\r\n"
166                 "Server: " SERVER_IDENTIFICATION "\r\n"
167                 "Connection: close\r\n";
168 }
169         
170 void UDPInput::add_destination(int stream_index)
171 {
172         stream_indices.push_back(stream_index);
173         servers->set_header(stream_index, http_header, "");
174 }
175
176 void UDPInput::do_work()
177 {
178         while (!should_stop()) {
179                 if (sock == -1) {
180                         int port_num = atoi(port.c_str());
181                         sockaddr_in6 addr = create_any_address(port_num);
182                         sock = create_server_socket(addr, UDP_SOCKET);
183                         if (sock == -1) {
184                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
185                                              url.c_str());
186                                 usleep(200000);
187                                 continue;
188                         }
189
190                         // The syntax udp://source@group (abusing the username field
191                         // to store the sender in SSM) seems to be a VLC invention.
192                         // We mimic it.
193                         if (!maybe_join_multicast_group(sock, host, user)) {
194                                 log(WARNING, "[%s] Multicast join failed. Waiting 0.2 seconds and trying again...",
195                                              url.c_str());
196                                 safe_close(sock);
197                                 sock = -1;
198                                 usleep(200000);
199                                 continue;
200                         }
201                 }
202
203                 // Wait for a packet, or a wakeup.
204                 bool activity = wait_for_activity(sock, POLLIN, NULL);
205                 if (!activity) {
206                         // Most likely, should_stop was set.
207                         continue;
208                 }
209
210                 int ret;
211                 do {
212                         ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
213                 } while (ret == -1 && errno == EINTR);
214
215                 if (ret <= 0) {
216                         log_perror("recv");
217                         close_socket();
218                         continue;
219                 }
220
221                 {
222                         MutexLock lock(&stats_mutex);
223                         stats.bytes_received += ret;
224                         stats.data_bytes_received += ret;
225                 }
226                 
227                 for (size_t i = 0; i < stream_indices.size(); ++i) {
228                         servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
229                 }
230         }
231 }
232
233 InputStats UDPInput::get_stats() const
234 {
235         MutexLock lock(&stats_mutex);
236         return stats;
237 }