]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Use the new-in-3.11 O_TMPFILE flag when available.
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stddef.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <time.h>
8 #include <unistd.h>
9 #include <string>
10
11 #include "acceptor.h"
12 #include "log.h"
13 #include "mutexlock.h"
14 #include "serverpool.h"
15 #include "state.pb.h"
16 #include "stream.h"
17 #include "udpinput.h"
18 #include "util.h"
19 #include "version.h"
20
21 using namespace std;
22
23 extern ServerPool *servers;
24
25 UDPInput::UDPInput(const string &url)
26         : url(url),
27           sock(-1)
28 {
29         // Should be verified by the caller.
30         string protocol;
31         bool ok = parse_url(url, &protocol, &host, &port, &path);
32         assert(ok);
33
34         construct_header();
35
36         pthread_mutex_init(&stats_mutex, NULL);
37         stats.url = url;
38         stats.bytes_received = 0;
39         stats.data_bytes_received = 0;
40         stats.connect_time = time(NULL);
41 }
42
43 UDPInput::UDPInput(const InputProto &serialized)
44         : url(serialized.url()),
45           sock(serialized.sock())
46 {
47         // Should be verified by the caller.
48         string protocol;
49         bool ok = parse_url(url, &protocol, &host, &port, &path);
50         assert(ok);
51
52         construct_header();
53
54         pthread_mutex_init(&stats_mutex, NULL);
55         stats.url = url;
56         stats.bytes_received = serialized.bytes_received();
57         stats.data_bytes_received = serialized.data_bytes_received();
58         if (serialized.has_connect_time()) {
59                 stats.connect_time = serialized.connect_time();
60         } else {
61                 stats.connect_time = time(NULL);
62         }
63 }
64
65 InputProto UDPInput::serialize() const
66 {
67         InputProto serialized;
68         serialized.set_url(url);
69         serialized.set_sock(sock);
70         serialized.set_bytes_received(stats.bytes_received);
71         serialized.set_data_bytes_received(stats.data_bytes_received);
72         serialized.set_connect_time(stats.connect_time);
73         return serialized;
74 }
75
76 void UDPInput::close_socket()
77 {
78         safe_close(sock);
79         sock = -1;
80 }
81         
82 void UDPInput::construct_header()
83 {
84         http_header =
85                 "HTTP/1.0 200 OK\r\n"
86                 "Content-type: application/octet-stream\r\n"
87                 "Cache-control: no-cache\r\n"
88                 "Server: " SERVER_IDENTIFICATION "\r\n"
89                 "Connection: close\r\n";
90 }
91         
92 void UDPInput::add_destination(int stream_index)
93 {
94         stream_indices.push_back(stream_index);
95         servers->set_header(stream_index, http_header, "");
96 }
97
98 void UDPInput::do_work()
99 {
100         while (!should_stop()) {
101                 if (sock == -1) {
102                         int port_num = atoi(port.c_str());
103                         sock = create_server_socket(port_num, UDP_SOCKET);
104                         if (sock == -1) {
105                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
106                                              url.c_str());
107                                 usleep(200000);
108                                 continue;
109                         }
110                 }
111
112                 // Wait for a packet, or a wakeup.
113                 bool activity = wait_for_activity(sock, POLLIN, NULL);
114                 if (!activity) {
115                         // Most likely, should_stop was set.
116                         continue;
117                 }
118
119                 int ret;
120                 do {
121                         ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
122                 } while (ret == -1 && errno == EINTR);
123
124                 if (ret <= 0) {
125                         log_perror("recv");
126                         close_socket();
127                         continue;
128                 }
129
130                 {
131                         MutexLock lock(&stats_mutex);
132                         stats.bytes_received += ret;
133                         stats.data_bytes_received += ret;
134                 }
135                 
136                 for (size_t i = 0; i < stream_indices.size(); ++i) {
137                         servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
138                 }
139         }
140 }
141
142 InputStats UDPInput::get_stats() const
143 {
144         MutexLock lock(&stats_mutex);
145         return stats;
146 }