Add preliminary support for input stream statistics.
[cubemap] / udpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <poll.h>
4 #include <stddef.h>
5 #include <stdlib.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 #include <string>
9
10 #include "acceptor.h"
11 #include "log.h"
12 #include "mutexlock.h"
13 #include "serverpool.h"
14 #include "state.pb.h"
15 #include "udpinput.h"
16 #include "util.h"
17 #include "version.h"
18
19 using namespace std;
20
21 extern ServerPool *servers;
22
23 UDPInput::UDPInput(const string &url)
24         : url(url),
25           sock(-1)
26 {
27         // Should be verified by the caller.
28         string protocol;
29         bool ok = parse_url(url, &protocol, &host, &port, &path);
30         assert(ok);
31
32         construct_header();
33
34         pthread_mutex_init(&stats_mutex, NULL);
35         stats.url = url;
36         stats.bytes_received = 0;
37         stats.data_bytes_received = 0;
38 }
39
40 UDPInput::UDPInput(const InputProto &serialized)
41         : url(serialized.url()),
42           sock(serialized.sock())
43 {
44         // Should be verified by the caller.
45         string protocol;
46         bool ok = parse_url(url, &protocol, &host, &port, &path);
47         assert(ok);
48
49         construct_header();
50
51         pthread_mutex_init(&stats_mutex, NULL);
52         stats.url = url;
53         stats.bytes_received = serialized.bytes_received();
54         stats.data_bytes_received = serialized.data_bytes_received();
55 }
56
57 InputProto UDPInput::serialize() const
58 {
59         InputProto serialized;
60         serialized.set_url(url);
61         serialized.set_sock(sock);
62         serialized.set_bytes_received(stats.bytes_received);
63         serialized.set_data_bytes_received(stats.data_bytes_received);
64         return serialized;
65 }
66
67 void UDPInput::close_socket()
68 {
69         safe_close(sock);
70         sock = -1;
71 }
72         
73 void UDPInput::construct_header()
74 {
75         http_header =
76                 "HTTP/1.0 200 OK\r\n"
77                 "Content-type: application/octet-stream\r\n"
78                 "Cache-control: no-cache\r\n"
79                 "Server: " SERVER_IDENTIFICATION "\r\n"
80                 "Connection: close\r\n";
81 }
82         
83 void UDPInput::add_destination(int stream_index)
84 {
85         stream_indices.push_back(stream_index);
86         servers->set_header(stream_index, http_header, "");
87 }
88
89 void UDPInput::do_work()
90 {
91         while (!should_stop()) {
92                 if (sock == -1) {
93                         int port_num = atoi(port.c_str());
94                         sock = create_server_socket(port_num, UDP_SOCKET);
95                         if (sock == -1) {
96                                 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
97                                              url.c_str());
98                                 usleep(200000);
99                                 continue;
100                         }
101                 }
102
103                 // Wait for a packet, or a wakeup.
104                 bool activity = wait_for_activity(sock, POLLIN, NULL);
105                 if (!activity) {
106                         // Most likely, should_stop was set.
107                         continue;
108                 }
109
110                 int ret;
111                 do {
112                         ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
113                 } while (ret == -1 && errno == EINTR);
114
115                 if (ret <= 0) {
116                         log_perror("recv");
117                         close_socket();
118                         continue;
119                 }
120
121                 {
122                         MutexLock lock(&stats_mutex);
123                         stats.bytes_received += ret;
124                         stats.data_bytes_received += ret;
125                 }
126                 
127                 for (size_t i = 0; i < stream_indices.size(); ++i) {
128                         servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
129                 }
130         }
131 }
132
133 InputStats UDPInput::get_stats() const
134 {
135         MutexLock lock(&stats_mutex);
136         return stats;
137 }