Add a listen statement to listen on only specific IP addresses, in addition to the...
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stddef.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <time.h>
8 #include <unistd.h>
9 #include <string>
10
11 #include "acceptor.h"
12 #include "log.h"
13 #include "mutexlock.h"
14 #include "serverpool.h"
15 #include "state.pb.h"
16 #include "stream.h"
17 #include "udpinput.h"
18 #include "util.h"
19 #include "version.h"
20
21 using namespace std;
22
23 extern ServerPool *servers;
24
25 UDPInput::UDPInput(const string &url)
26         : url(url),
27           sock(-1)
28 {
29         // Should be verified by the caller.
30         string protocol;
31         bool ok = parse_url(url, &protocol, &host, &port, &path);
32         assert(ok);
33
34         construct_header();
35
36         pthread_mutex_init(&stats_mutex, NULL);
37         stats.url = url;
38         stats.bytes_received = 0;
39         stats.data_bytes_received = 0;
40         stats.connect_time = time(NULL);
41 }
42
43 UDPInput::UDPInput(const InputProto &serialized)
44         : url(serialized.url()),
45           sock(serialized.sock())
46 {
47         // Should be verified by the caller.
48         string protocol;
49         bool ok = parse_url(url, &protocol, &host, &port, &path);
50         assert(ok);
51
52         construct_header();
53
54         pthread_mutex_init(&stats_mutex, NULL);
55         stats.url = url;
56         stats.bytes_received = serialized.bytes_received();
57         stats.data_bytes_received = serialized.data_bytes_received();
58         if (serialized.has_connect_time()) {
59                 stats.connect_time = serialized.connect_time();
60         } else {
61                 stats.connect_time = time(NULL);
62         }
63 }
64
65 InputProto UDPInput::serialize() const
66 {
67         InputProto serialized;
68         serialized.set_url(url);
69         serialized.set_sock(sock);
70         serialized.set_bytes_received(stats.bytes_received);
71         serialized.set_data_bytes_received(stats.data_bytes_received);
72         serialized.set_connect_time(stats.connect_time);
73         return serialized;
74 }
75
76 void UDPInput::close_socket()
77 {
78         safe_close(sock);
79         sock = -1;
80 }
81         
82 void UDPInput::construct_header()
83 {
84         http_header =
85                 "HTTP/1.0 200 OK\r\n"
86                 "Content-type: application/octet-stream\r\n"
87                 "Cache-control: no-cache\r\n"
88                 "Server: " SERVER_IDENTIFICATION "\r\n"
89                 "Connection: close\r\n";
90 }
91         
92 void UDPInput::add_destination(int stream_index)
93 {
94         stream_indices.push_back(stream_index);
95         servers->set_header(stream_index, http_header, "");
96 }
97
98 void UDPInput::do_work()
99 {
100         while (!should_stop()) {
101                 if (sock == -1) {
102                         int port_num = atoi(port.c_str());
103                         sockaddr_in6 addr = CreateAnyAddress(port_num);
104                         sock = create_server_socket(addr, UDP_SOCKET);
105                         if (sock == -1) {
106                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
107                                              url.c_str());
108                                 usleep(200000);
109                                 continue;
110                         }
111                 }
112
113                 // Wait for a packet, or a wakeup.
114                 bool activity = wait_for_activity(sock, POLLIN, NULL);
115                 if (!activity) {
116                         // Most likely, should_stop was set.
117                         continue;
118                 }
119
120                 int ret;
121                 do {
122                         ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
123                 } while (ret == -1 && errno == EINTR);
124
125                 if (ret <= 0) {
126                         log_perror("recv");
127                         close_socket();
128                         continue;
129                 }
130
131                 {
132                         MutexLock lock(&stats_mutex);
133                         stats.bytes_received += ret;
134                         stats.data_bytes_received += ret;
135                 }
136                 
137                 for (size_t i = 0; i < stream_indices.size(); ++i) {
138                         servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
139                 }
140         }
141 }
142
143 InputStats UDPInput::get_stats() const
144 {
145         MutexLock lock(&stats_mutex);
146         return stats;
147 }