]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Move Server:add_data() into Stream, where it more logically belongs.
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 #include <string>
9
10 #include "acceptor.h"
11 #include "serverpool.h"
12 #include "state.pb.h"
13 #include "udpinput.h"
14 #include "version.h"
15
16 using namespace std;
17
18 extern ServerPool *servers;
19
20 UDPInput::UDPInput(const string &stream_id, const string &url)
21         : stream_id(stream_id),
22           url(url),
23           sock(-1)
24 {
25         // Should be verified by the caller.
26         string protocol;
27         bool ok = parse_url(url, &protocol, &host, &port, &path);
28         assert(ok);
29
30         construct_header();
31 }
32
33 UDPInput::UDPInput(const InputProto &serialized)
34         : stream_id(serialized.stream_id()),
35           url(serialized.url()),
36           sock(serialized.sock())
37 {
38         // Should be verified by the caller.
39         string protocol;
40         bool ok = parse_url(url, &protocol, &host, &port, &path);
41         assert(ok);
42
43         construct_header();
44 }
45
46 InputProto UDPInput::serialize() const
47 {
48         InputProto serialized;
49         serialized.set_stream_id(stream_id);
50         serialized.set_url(url);
51         serialized.set_sock(sock);
52         return serialized;
53 }
54
55 void UDPInput::close_socket()
56 {
57         int ret;
58         do {
59                 ret = close(sock);
60         } while (ret == -1 && errno == EINTR);
61
62         if (ret == -1) {
63                 perror("close()");
64         }
65
66         sock = -1;
67 }
68         
69 void UDPInput::construct_header()
70 {
71         string header =
72                 "HTTP/1.0 200 OK\r\n"
73                 "Content-type: application/octet-stream\r\n"
74                 "Cache-control: no-cache\r\n"
75                 "Server: " SERVER_IDENTIFICATION "\r\n"
76                 "\r\n";
77         servers->set_header(stream_id, header);
78 }
79
80 void UDPInput::do_work()
81 {
82         while (!should_stop) {
83                 if (sock == -1) {
84                         int port_num = atoi(port.c_str());
85                         sock = create_server_socket(port_num, UDP_SOCKET);
86                         if (sock == -1) {
87                                 fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n");
88                                 usleep(200000);
89                                 continue;
90                         }
91                 }
92
93                 // Since we are non-blocking, we need to wait for the right state first.
94                 // Wait up to 50 ms, then check should_stop.
95                 pollfd pfd;
96                 pfd.fd = sock;
97                 pfd.events = POLLIN;
98
99                 int nfds = poll(&pfd, 1, 50);
100                 if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
101                         continue;
102                 }
103                 if (nfds == -1) {
104                         perror("poll");
105                         close_socket();
106                         continue;       
107                 }
108
109                 char buf[4096];
110                 int ret;
111                 do {
112                         ret = recv(sock, buf, sizeof(buf), 0);
113                 } while (ret == -1 && errno == EINTR);
114
115                 if (ret <= 0) {
116                         perror("recv");
117                         close_socket();
118                         continue;
119                 }
120                 
121                 servers->add_data(stream_id, buf, ret);
122         }
123 }