]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Fix a memory leak in Server (streams were leaked).
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 #include <string>
9
10 #include "acceptor.h"
11 #include "log.h"
12 #include "serverpool.h"
13 #include "state.pb.h"
14 #include "udpinput.h"
15 #include "version.h"
16
17 using namespace std;
18
19 extern ServerPool *servers;
20
21 UDPInput::UDPInput(const string &stream_id, const string &url)
22         : stream_id(stream_id),
23           url(url),
24           sock(-1)
25 {
26         // Should be verified by the caller.
27         string protocol;
28         bool ok = parse_url(url, &protocol, &host, &port, &path);
29         assert(ok);
30
31         construct_header();
32 }
33
34 UDPInput::UDPInput(const InputProto &serialized)
35         : stream_id(serialized.stream_id()),
36           url(serialized.url()),
37           sock(serialized.sock())
38 {
39         // Should be verified by the caller.
40         string protocol;
41         bool ok = parse_url(url, &protocol, &host, &port, &path);
42         assert(ok);
43
44         construct_header();
45 }
46
47 InputProto UDPInput::serialize() const
48 {
49         InputProto serialized;
50         serialized.set_stream_id(stream_id);
51         serialized.set_url(url);
52         serialized.set_sock(sock);
53         return serialized;
54 }
55
56 void UDPInput::close_socket()
57 {
58         int ret;
59         do {
60                 ret = close(sock);
61         } while (ret == -1 && errno == EINTR);
62
63         if (ret == -1) {
64                 log_perror("close()");
65         }
66
67         sock = -1;
68 }
69         
70 void UDPInput::construct_header()
71 {
72         string header =
73                 "HTTP/1.0 200 OK\r\n"
74                 "Content-type: application/octet-stream\r\n"
75                 "Cache-control: no-cache\r\n"
76                 "Server: " SERVER_IDENTIFICATION "\r\n"
77                 "\r\n";
78         servers->set_header(stream_id, header);
79 }
80
81 void UDPInput::do_work()
82 {
83         while (!should_stop) {
84                 if (sock == -1) {
85                         int port_num = atoi(port.c_str());
86                         sock = create_server_socket(port_num, UDP_SOCKET);
87                         if (sock == -1) {
88                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
89                                              stream_id.c_str());
90                                 usleep(200000);
91                                 continue;
92                         }
93                 }
94
95                 // Since we are non-blocking, we need to wait for the right state first.
96                 // Wait up to 50 ms, then check should_stop.
97                 pollfd pfd;
98                 pfd.fd = sock;
99                 pfd.events = POLLIN;
100
101                 int nfds = poll(&pfd, 1, 50);
102                 if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
103                         continue;
104                 }
105                 if (nfds == -1) {
106                         log_perror("poll");
107                         close_socket();
108                         continue;       
109                 }
110
111                 char buf[4096];
112                 int ret;
113                 do {
114                         ret = recv(sock, buf, sizeof(buf), 0);
115                 } while (ret == -1 && errno == EINTR);
116
117                 if (ret <= 0) {
118                         log_perror("recv");
119                         close_socket();
120                         continue;
121                 }
122                 
123                 servers->add_data(stream_id, buf, ret);
124         }
125 }