]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Merge branch 'master' of /srv/git.sesse.net/www/cubemap
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stddef.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <sys/socket.h>
8 #include <unistd.h>
9 #include <string>
10
11 #include "acceptor.h"
12 #include "log.h"
13 #include "serverpool.h"
14 #include "state.pb.h"
15 #include "udpinput.h"
16 #include "util.h"
17 #include "version.h"
18
19 using namespace std;
20
21 extern ServerPool *servers;
22
23 UDPInput::UDPInput(const string &url)
24         : url(url),
25           sock(-1)
26 {
27         // Should be verified by the caller.
28         string protocol;
29         bool ok = parse_url(url, &protocol, &host, &port, &path);
30         assert(ok);
31
32         construct_header();
33 }
34
35 UDPInput::UDPInput(const InputProto &serialized)
36         : url(serialized.url()),
37           sock(serialized.sock())
38 {
39         // Should be verified by the caller.
40         string protocol;
41         bool ok = parse_url(url, &protocol, &host, &port, &path);
42         assert(ok);
43
44         construct_header();
45 }
46
47 InputProto UDPInput::serialize() const
48 {
49         InputProto serialized;
50         serialized.set_url(url);
51         serialized.set_sock(sock);
52         return serialized;
53 }
54
55 void UDPInput::close_socket()
56 {
57         safe_close(sock);
58         sock = -1;
59 }
60         
61 void UDPInput::construct_header()
62 {
63         http_header =
64                 "HTTP/1.0 200 OK\r\n"
65                 "Content-type: application/octet-stream\r\n"
66                 "Cache-control: no-cache\r\n"
67                 "Server: " SERVER_IDENTIFICATION "\r\n"
68                 "Connection: close\r\n";
69 }
70         
71 void UDPInput::add_destination(int stream_index)
72 {
73         stream_indices.push_back(stream_index);
74         servers->set_header(stream_index, http_header, "");
75 }
76
77 void UDPInput::do_work()
78 {
79         while (!should_stop()) {
80                 if (sock == -1) {
81                         int port_num = atoi(port.c_str());
82                         sock = create_server_socket(port_num, UDP_SOCKET);
83                         if (sock == -1) {
84                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
85                                              url.c_str());
86                                 usleep(200000);
87                                 continue;
88                         }
89                 }
90
91                 // Wait for a packet, or a wakeup.
92                 bool activity = wait_for_activity(sock, POLLIN, NULL);
93                 if (!activity) {
94                         // Most likely, should_stop was set.
95                         continue;
96                 }
97
98                 char buf[4096];
99                 int ret;
100                 do {
101                         ret = recv(sock, buf, sizeof(buf), 0);
102                 } while (ret == -1 && errno == EINTR);
103
104                 if (ret <= 0) {
105                         log_perror("recv");
106                         close_socket();
107                         continue;
108                 }
109                 
110                 for (size_t i = 0; i < stream_indices.size(); ++i) {
111                         servers->add_data(stream_indices[i], buf, ret);
112                 }
113         }
114 }