]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Fix some issues with the last stats.cpp fix.
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 #include <string>
9
10 #include "acceptor.h"
11 #include "log.h"
12 #include "serverpool.h"
13 #include "state.pb.h"
14 #include "udpinput.h"
15 #include "version.h"
16
17 using namespace std;
18
19 extern ServerPool *servers;
20
21 UDPInput::UDPInput(const string &url)
22         : url(url),
23           sock(-1)
24 {
25         // Should be verified by the caller.
26         string protocol;
27         bool ok = parse_url(url, &protocol, &host, &port, &path);
28         assert(ok);
29
30         construct_header();
31 }
32
33 UDPInput::UDPInput(const InputProto &serialized)
34         : url(serialized.url()),
35           sock(serialized.sock())
36 {
37         // Should be verified by the caller.
38         string protocol;
39         bool ok = parse_url(url, &protocol, &host, &port, &path);
40         assert(ok);
41
42         construct_header();
43 }
44
45 InputProto UDPInput::serialize() const
46 {
47         InputProto serialized;
48         serialized.set_url(url);
49         serialized.set_sock(sock);
50         return serialized;
51 }
52
53 void UDPInput::close_socket()
54 {
55         int ret;
56         do {
57                 ret = close(sock);
58         } while (ret == -1 && errno == EINTR);
59
60         if (ret == -1) {
61                 log_perror("close()");
62         }
63
64         sock = -1;
65 }
66         
67 void UDPInput::construct_header()
68 {
69         http_header =
70                 "HTTP/1.0 200 OK\r\n"
71                 "Content-type: application/octet-stream\r\n"
72                 "Cache-control: no-cache\r\n"
73                 "Server: " SERVER_IDENTIFICATION "\r\n"
74                 "Connection: close\r\n";
75 }
76         
77 void UDPInput::add_destination(const string &stream_id)
78 {
79         stream_ids.push_back(stream_id);
80         servers->set_header(stream_id, http_header, "");
81 }
82
83 void UDPInput::do_work()
84 {
85         while (!should_stop) {
86                 if (sock == -1) {
87                         int port_num = atoi(port.c_str());
88                         sock = create_server_socket(port_num, UDP_SOCKET);
89                         if (sock == -1) {
90                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
91                                              url.c_str());
92                                 usleep(200000);
93                                 continue;
94                         }
95                 }
96
97                 // Since we are non-blocking, we need to wait for the right state first.
98                 // Wait up to 50 ms, then check should_stop.
99                 pollfd pfd;
100                 pfd.fd = sock;
101                 pfd.events = POLLIN;
102
103                 int nfds = poll(&pfd, 1, 50);
104                 if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
105                         continue;
106                 }
107                 if (nfds == -1) {
108                         log_perror("poll");
109                         close_socket();
110                         continue;       
111                 }
112
113                 char buf[4096];
114                 int ret;
115                 do {
116                         ret = recv(sock, buf, sizeof(buf), 0);
117                 } while (ret == -1 && errno == EINTR);
118
119                 if (ret <= 0) {
120                         log_perror("recv");
121                         close_socket();
122                         continue;
123                 }
124                 
125                 for (size_t i = 0; i < stream_ids.size(); ++i) {
126                         servers->add_data(stream_ids[i], buf, ret);
127                 }
128         }
129 }