Fix a crash when a HTTP input connected to an UDP output goes unavailable.
[cubemap] / serverpool.cpp
1 #include <assert.h>
2 #include <fcntl.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5
6 #include "client.h"
7 #include "log.h"
8 #include "server.h"
9 #include "serverpool.h"
10 #include "state.pb.h"
11 #include "udpstream.h"
12 #include "util.h"
13
14 struct sockaddr_in6;
15
16 using namespace std;
17
18 ServerPool::ServerPool(int size)
19         : servers(new Server[size]),
20           num_servers(size)
21 {
22 }
23
24 CubemapStateProto ServerPool::serialize()
25 {
26         CubemapStateProto state;
27
28         unordered_map<const string *, size_t> short_response_pool;
29
30         for (int i = 0; i < num_servers; ++i) {
31                 CubemapStateProto local_state = servers[i].serialize(&short_response_pool);
32
33                 // The stream state should be identical between the servers, so we only store it once,
34                 // save for the fds, which we keep around to distribute to the servers after re-exec.
35                 if (i == 0) {
36                         state.mutable_streams()->MergeFrom(local_state.streams());
37                 } else {
38                         assert(state.streams_size() == local_state.streams_size());
39                         for (int j = 0; j < local_state.streams_size(); ++j) {
40                                 assert(local_state.streams(j).data_fds_size() == 1);
41                                 state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
42                         }
43                 }
44                 for (const ClientProto &client : local_state.clients()) {
45                         state.add_clients()->MergeFrom(client);
46                 }
47                 for (const HLSZombieProto &hls_zombie : local_state.hls_zombies()) {
48                         state.add_hls_zombies()->MergeFrom(hls_zombie);
49                 }
50         }
51
52         for (size_t i = 0; i < short_response_pool.size(); ++i) {
53                 state.mutable_short_response_pool()->Add();
54         }
55         for (const auto &string_and_index : short_response_pool) {
56                 state.mutable_short_response_pool(string_and_index.second)->set_header_or_short_response(*string_and_index.first);
57         }
58
59         return state;
60 }
61
62 void ServerPool::add_client(int sock, Acceptor *acceptor)
63 {
64         servers[clients_added++ % num_servers].add_client_deferred(sock, acceptor);
65 }
66
67 void ServerPool::add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses)
68 {
69         servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
70 }
71
72 // It's fine to abuse clients_added here, since it's only ever used for round-robin purposes.
73 void ServerPool::add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie)
74 {
75         servers[clients_added++ % num_servers].add_hls_zombie_from_serialized(hls_zombie);
76 }
77
78 int ServerPool::lookup_stream_by_url(const string &url) const
79 {
80         assert(servers != nullptr);
81         return servers[0].lookup_stream_by_url(url);
82 }
83
84 int ServerPool::add_stream(const string &url,
85                            const string &hls_url,
86                            size_t backlog_size,
87                            size_t prebuffering_bytes,
88                            Stream::Encoding encoding,
89                            Stream::Encoding src_encoding,
90                            unsigned hls_frag_duration,
91                            size_t hls_backlog_margin,
92                            const string &allow_origin)
93 {
94         // Adding more HTTP streams after UDP streams would cause the UDP stream
95         // indices to move around, which is obviously not good.
96         assert(udp_streams.empty());
97
98         for (int i = 0; i < num_servers; ++i) {
99                 int stream_index = servers[i].add_stream(url, hls_url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin);
100                 assert(stream_index == num_http_streams);
101         }
102         return num_http_streams++;
103 }
104
105 int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
106 {
107         // Adding more HTTP streams after UDP streams would cause the UDP stream
108         // indices to move around, which is obviously not good.
109         assert(udp_streams.empty());
110
111         assert(!data_fds.empty());
112         string contents;
113         for (int i = 0; i < num_servers; ++i) {
114                 int data_fd;
115                 if (i < int(data_fds.size())) {
116                         // Reuse one of the existing file descriptors.
117                         data_fd = data_fds[i];
118                 } else {
119                         // Clone the first one.
120                         if (contents.empty()) {
121                                 if (!read_tempfile(data_fds[0], &contents)) {
122                                         exit(1);
123                                 }
124                         }
125                         data_fd = make_tempfile(contents);
126                 }
127
128                 int stream_index = servers[i].add_stream_from_serialized(stream, data_fd);
129                 assert(stream_index == num_http_streams);
130         }
131
132         // Close and delete any leftovers, if the number of servers was reduced.
133         for (size_t i = num_servers; i < data_fds.size(); ++i) {
134                 safe_close(data_fds[i]);  // Implicitly deletes the file.
135         }
136
137         return num_http_streams++;
138 }
139         
140 int ServerPool::add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index)
141 {
142         udp_streams.emplace_back(new UDPStream(dst, pacing_rate, ttl, multicast_iface_index));
143         return num_http_streams + udp_streams.size() - 1;
144 }
145
146 void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header)
147 {
148         assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
149
150         if (stream_index >= num_http_streams) {
151                 // UDP stream. TODO: Log which stream this is.
152                 if (!stream_header.empty()) {
153                         log(WARNING, "Trying to send stream format with headers to a UDP destination. This is unlikely to work well.");
154                 }
155
156                 // Ignore the HTTP header.
157                 return;
158         }
159
160         // HTTP stream.
161         for (int i = 0; i < num_servers; ++i) {
162                 servers[i].set_header(stream_index, http_header, stream_header);
163         }
164 }
165
166 void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
167 {
168         assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
169
170         if (stream_index >= num_http_streams) {
171                 // UDP stream.
172                 udp_streams[stream_index - num_http_streams]->send(data, bytes);
173                 return;
174         }
175
176         // HTTP stream.
177         for (int i = 0; i < num_servers; ++i) {
178                 servers[i].add_data_deferred(stream_index, data, bytes, metacube_flags, pts);
179         }
180 }
181
182 void ServerPool::set_unavailable(int stream_index)
183 {
184         assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
185         if (stream_index < ssize_t(num_http_streams)) {
186                 for (int i = 0; i < num_servers; ++i) {
187                         servers[i].set_unavailable(stream_index);
188                 }
189         }
190 }
191
192 void ServerPool::add_gen204(const std::string &url, const std::string &allow_origin)
193 {
194         for (int i = 0; i < num_servers; ++i) {
195                 servers[i].add_gen204(url, allow_origin);
196         }
197 }
198
199 void ServerPool::create_tls_context_for_acceptor(const Acceptor *acceptor)
200 {
201         for (int i = 0; i < num_servers; ++i) {
202                 servers[i].create_tls_context_for_acceptor(acceptor);
203         }
204 }
205
206 void ServerPool::run()
207 {
208         for (int i = 0; i < num_servers; ++i) {
209                 servers[i].run();
210         }
211 }
212         
213 void ServerPool::stop()
214 {
215         for (int i = 0; i < num_servers; ++i) {
216                 servers[i].stop();
217         }
218 }
219         
220 vector<ClientStats> ServerPool::get_client_stats() const
221 {
222         vector<ClientStats> ret;
223         for (int i = 0; i < num_servers; ++i) {
224                 vector<ClientStats> stats = servers[i].get_client_stats();
225                 ret.insert(ret.end(), stats.begin(), stats.end());
226         }
227         return ret;
228 }
229
230 vector<HLSZombie> ServerPool::get_hls_zombies() const
231 {
232         vector<HLSZombie> ret;
233         for (int i = 0; i < num_servers; ++i) {
234                 vector<HLSZombie> stats = servers[i].get_hls_zombies();
235                 ret.insert(ret.end(), stats.begin(), stats.end());
236         }
237         return ret;
238 }
239
240 void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
241 {
242         for (int i = 0; i < num_servers; ++i) {
243                 servers[i].set_pacing_rate(stream_index, pacing_rate);
244         }       
245 }
246
247 void ServerPool::set_backlog_size(int stream_index, size_t new_size)
248 {
249         for (int i = 0; i < num_servers; ++i) {
250                 servers[i].set_backlog_size(stream_index, new_size);
251         }       
252 }
253
254 void ServerPool::set_prebuffering_bytes(int stream_index, size_t new_amount)
255 {
256         for (int i = 0; i < num_servers; ++i) {
257                 servers[i].set_prebuffering_bytes(stream_index, new_amount);
258         }
259 }
260
261 void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding)
262 {
263         for (int i = 0; i < num_servers; ++i) {
264                 servers[i].set_encoding(stream_index, encoding);
265         }       
266 }
267
268 void ServerPool::set_src_encoding(int stream_index, Stream::Encoding encoding)
269 {
270         for (int i = 0; i < num_servers; ++i) {
271                 servers[i].set_src_encoding(stream_index, encoding);
272         }
273 }
274
275 void ServerPool::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
276 {
277         for (int i = 0; i < num_servers; ++i) {
278                 servers[i].set_hls_frag_duration(stream_index, hls_frag_duration);
279         }
280 }
281
282 void ServerPool::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
283 {
284         for (int i = 0; i < num_servers; ++i) {
285                 servers[i].set_hls_backlog_margin(stream_index, hls_backlog_margin);
286         }
287 }
288
289 void ServerPool::set_allow_origin(int stream_index, const std::string &allow_origin)
290 {
291         for (int i = 0; i < num_servers; ++i) {
292                 servers[i].set_allow_origin(stream_index, allow_origin);
293         }
294 }
295
296 void ServerPool::register_hls_url(int stream_index, const string &hls_url)
297 {
298         for (int i = 0; i < num_servers; ++i) {
299                 servers[i].register_hls_url(stream_index, hls_url);
300         }
301 }