]> git.sesse.net Git - cubemap/blob - udpinput.cpp
Move iovecs around instead of having single data buffers. Hopefully a tad more effici...
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 #include <string>
9
10 #include "acceptor.h"
11 #include "log.h"
12 #include "serverpool.h"
13 #include "state.pb.h"
14 #include "udpinput.h"
15 #include "util.h"
16 #include "version.h"
17
18 using namespace std;
19
20 extern ServerPool *servers;
21
22 UDPInput::UDPInput(const string &url)
23         : url(url),
24           sock(-1)
25 {
26         // Should be verified by the caller.
27         string protocol;
28         bool ok = parse_url(url, &protocol, &host, &port, &path);
29         assert(ok);
30
31         construct_header();
32 }
33
34 UDPInput::UDPInput(const InputProto &serialized)
35         : url(serialized.url()),
36           sock(serialized.sock())
37 {
38         // Should be verified by the caller.
39         string protocol;
40         bool ok = parse_url(url, &protocol, &host, &port, &path);
41         assert(ok);
42
43         construct_header();
44 }
45
46 InputProto UDPInput::serialize() const
47 {
48         InputProto serialized;
49         serialized.set_url(url);
50         serialized.set_sock(sock);
51         return serialized;
52 }
53
54 void UDPInput::close_socket()
55 {
56         safe_close(sock);
57         sock = -1;
58 }
59         
60 void UDPInput::construct_header()
61 {
62         http_header =
63                 "HTTP/1.0 200 OK\r\n"
64                 "Content-type: application/octet-stream\r\n"
65                 "Cache-control: no-cache\r\n"
66                 "Server: " SERVER_IDENTIFICATION "\r\n"
67                 "Connection: close\r\n";
68 }
69         
70 void UDPInput::add_destination(const string &stream_id)
71 {
72         stream_ids.push_back(stream_id);
73         servers->set_header(stream_id, http_header, "");
74 }
75
76 void UDPInput::do_work()
77 {
78         while (!should_stop()) {
79                 if (sock == -1) {
80                         int port_num = atoi(port.c_str());
81                         sock = create_server_socket(port_num, UDP_SOCKET);
82                         if (sock == -1) {
83                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
84                                              url.c_str());
85                                 usleep(200000);
86                                 continue;
87                         }
88                 }
89
90                 // Wait for a packet, or a wakeup.
91                 bool activity = wait_for_activity(sock, POLLIN, NULL);
92                 if (!activity) {
93                         // Most likely, should_stop was set.
94                         continue;
95                 }
96
97                 char buf[4096];
98                 int ret;
99                 do {
100                         ret = recv(sock, buf, sizeof(buf), 0);
101                 } while (ret == -1 && errno == EINTR);
102
103                 if (ret <= 0) {
104                         log_perror("recv");
105                         close_socket();
106                         continue;
107                 }
108                 
109                 for (size_t i = 0; i < stream_ids.size(); ++i) {
110                         servers->add_data(stream_ids[i], buf, ret);
111                 }
112         }
113 }