]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Use memcpy instead of incurring potential unaligned reads when reading the Metacube...
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stddef.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 #include <string>
9
10 #include "acceptor.h"
11 #include "log.h"
12 #include "mutexlock.h"
13 #include "serverpool.h"
14 #include "state.pb.h"
15 #include "udpinput.h"
16 #include "util.h"
17 #include "version.h"
18
19 using namespace std;
20
21 extern ServerPool *servers;
22
23 UDPInput::UDPInput(const string &url)
24         : url(url),
25           sock(-1)
26 {
27         // Should be verified by the caller.
28         string protocol;
29         bool ok = parse_url(url, &protocol, &host, &port, &path);
30         assert(ok);
31
32         construct_header();
33
34         pthread_mutex_init(&stats_mutex, NULL);
35         stats.url = url;
36         stats.bytes_received = 0;
37         stats.data_bytes_received = 0;
38         stats.connect_time = time(NULL);
39 }
40
41 UDPInput::UDPInput(const InputProto &serialized)
42         : url(serialized.url()),
43           sock(serialized.sock())
44 {
45         // Should be verified by the caller.
46         string protocol;
47         bool ok = parse_url(url, &protocol, &host, &port, &path);
48         assert(ok);
49
50         construct_header();
51
52         pthread_mutex_init(&stats_mutex, NULL);
53         stats.url = url;
54         stats.bytes_received = serialized.bytes_received();
55         stats.data_bytes_received = serialized.data_bytes_received();
56         if (serialized.has_connect_time()) {
57                 stats.connect_time = serialized.connect_time();
58         } else {
59                 stats.connect_time = time(NULL);
60         }
61 }
62
63 InputProto UDPInput::serialize() const
64 {
65         InputProto serialized;
66         serialized.set_url(url);
67         serialized.set_sock(sock);
68         serialized.set_bytes_received(stats.bytes_received);
69         serialized.set_data_bytes_received(stats.data_bytes_received);
70         serialized.set_connect_time(stats.connect_time);
71         return serialized;
72 }
73
74 void UDPInput::close_socket()
75 {
76         safe_close(sock);
77         sock = -1;
78 }
79         
80 void UDPInput::construct_header()
81 {
82         http_header =
83                 "HTTP/1.0 200 OK\r\n"
84                 "Content-type: application/octet-stream\r\n"
85                 "Cache-control: no-cache\r\n"
86                 "Server: " SERVER_IDENTIFICATION "\r\n"
87                 "Connection: close\r\n";
88 }
89         
90 void UDPInput::add_destination(int stream_index)
91 {
92         stream_indices.push_back(stream_index);
93         servers->set_header(stream_index, http_header, "");
94 }
95
96 void UDPInput::do_work()
97 {
98         while (!should_stop()) {
99                 if (sock == -1) {
100                         int port_num = atoi(port.c_str());
101                         sock = create_server_socket(port_num, UDP_SOCKET);
102                         if (sock == -1) {
103                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
104                                              url.c_str());
105                                 usleep(200000);
106                                 continue;
107                         }
108                 }
109
110                 // Wait for a packet, or a wakeup.
111                 bool activity = wait_for_activity(sock, POLLIN, NULL);
112                 if (!activity) {
113                         // Most likely, should_stop was set.
114                         continue;
115                 }
116
117                 int ret;
118                 do {
119                         ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
120                 } while (ret == -1 && errno == EINTR);
121
122                 if (ret <= 0) {
123                         log_perror("recv");
124                         close_socket();
125                         continue;
126                 }
127
128                 {
129                         MutexLock lock(&stats_mutex);
130                         stats.bytes_received += ret;
131                         stats.data_bytes_received += ret;
132                 }
133                 
134                 for (size_t i = 0; i < stream_indices.size(); ++i) {
135                         servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
136                 }
137         }
138 }
139
140 InputStats UDPInput::get_stats() const
141 {
142         MutexLock lock(&stats_mutex);
143         return stats;
144 }