]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Since we just broke upgrade compatibility, kill some older stuff in the state protos.
[cubemap] / udpinput.cpp
1 #include <stdio.h>
2 #include <sys/types.h>
3 #include <sys/socket.h>
4 #include <sys/poll.h>
5 #include <arpa/inet.h>
6 #include <errno.h>
7 #include <string>
8
9 #include "acceptor.h"
10 #include "udpinput.h"
11 #include "serverpool.h"
12 #include "version.h"
13 #include "state.pb.h"
14
15 using namespace std;
16
17 extern ServerPool *servers;
18
19 UDPInput::UDPInput(const string &stream_id, const string &url)
20         : stream_id(stream_id),
21           url(url),
22           sock(-1)
23 {
24         // Should be verified by the caller.
25         string protocol;
26         bool ok = parse_url(url, &protocol, &host, &port, &path);
27         assert(ok);
28
29         construct_header();
30 }
31
32 UDPInput::UDPInput(const InputProto &serialized)
33         : stream_id(serialized.stream_id()),
34           url(serialized.url()),
35           sock(serialized.sock())
36 {
37         // Should be verified by the caller.
38         string protocol;
39         bool ok = parse_url(url, &protocol, &host, &port, &path);
40         assert(ok);
41
42         construct_header();
43 }
44
45 InputProto UDPInput::serialize() const
46 {
47         InputProto serialized;
48         serialized.set_stream_id(stream_id);
49         serialized.set_url(url);
50         serialized.set_sock(sock);
51         return serialized;
52 }
53
54 void UDPInput::close_socket()
55 {
56         int ret;
57         do {
58                 ret = close(sock);
59         } while (ret == -1 && errno == EINTR);
60
61         if (ret == -1) {
62                 perror("close()");
63         }
64
65         sock = -1;
66 }
67         
68 void UDPInput::construct_header()
69 {
70         string header =
71                 "HTTP/1.0 200 OK\r\n"
72                 "Content-type: application/octet-stream\r\n"
73                 "Cache-control: no-cache\r\n"
74                 "Server: " SERVER_IDENTIFICATION "\r\n"
75                 "\r\n";
76         servers->set_header(stream_id, header);
77 }
78
79 void UDPInput::do_work()
80 {
81         while (!should_stop) {
82                 if (sock == -1) {
83                         int port_num = atoi(port.c_str());
84                         sock = create_server_socket(port_num, UDP_SOCKET);
85                         if (sock == -1) {
86                                 fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n");
87                                 usleep(200000);
88                                 continue;
89                         }
90                 }
91
92                 // Since we are non-blocking, we need to wait for the right state first.
93                 // Wait up to 50 ms, then check should_stop.
94                 pollfd pfd;
95                 pfd.fd = sock;
96                 pfd.events = POLLIN;
97
98                 int nfds = poll(&pfd, 1, 50);
99                 if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
100                         continue;
101                 }
102                 if (nfds == -1) {
103                         perror("poll");
104                         close_socket();
105                         continue;       
106                 }
107
108                 char buf[4096];
109                 int ret;
110                 do {
111                         ret = recv(sock, buf, sizeof(buf), 0);
112                 } while (ret == -1 && errno == EINTR);
113
114                 if (ret <= 0) {
115                         perror("recv");
116                         close_socket();
117                         continue;
118                 }
119                 
120                 servers->add_data(stream_id, buf, ret);
121         }
122 }