]> git.sesse.net Git - cubemap/blob - server.cpp
Create $(libdir) on make install.
[cubemap] / server.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <inttypes.h>
4 #include <limits.h>
5 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include <pthread.h>
8 #include <stdint.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/epoll.h>
13 #include <sys/sendfile.h>
14 #include <sys/socket.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #include <algorithm>
18 #include <map>
19 #include <string>
20 #include <utility>
21 #include <vector>
22
23 #include "ktls.h"
24 #include "tlse.h"
25
26 #include "acceptor.h"
27 #include "accesslog.h"
28 #include "log.h"
29 #include "metacube2.h"
30 #include "parse.h"
31 #include "server.h"
32 #include "state.pb.h"
33 #include "stream.h"
34 #include "util.h"
35
36 #ifndef SO_MAX_PACING_RATE
37 #define SO_MAX_PACING_RATE 47
38 #endif
39
40 using namespace std;
41
42 extern AccessLogThread *access_log;
43
44 namespace {
45
46 inline bool is_equal(timespec a, timespec b)
47 {
48         return a.tv_sec == b.tv_sec &&
49                a.tv_nsec == b.tv_nsec;
50 }
51
52 inline bool is_earlier(timespec a, timespec b)
53 {
54         if (a.tv_sec != b.tv_sec)
55                 return a.tv_sec < b.tv_sec;
56         return a.tv_nsec < b.tv_nsec;
57 }
58
59 }  // namespace
60
61 Server::Server()
62 {
63         epoll_fd = epoll_create1(EPOLL_CLOEXEC);
64         if (epoll_fd == -1) {
65                 log_perror("epoll_fd");
66                 exit(1);
67         }
68 }
69
70 Server::~Server()
71 {
72         safe_close(epoll_fd);
73
74         // We're going to die soon anyway, but clean this up to keep leak checking happy.
75         for (const auto &acceptor_and_context : tls_server_contexts) {
76                 tls_destroy_context(acceptor_and_context.second);
77         }
78 }
79
80 vector<ClientStats> Server::get_client_stats() const
81 {
82         vector<ClientStats> ret;
83
84         lock_guard<mutex> lock(mu);
85         for (const auto &fd_and_client : clients) {
86                 ret.push_back(fd_and_client.second.get_stats());
87         }
88         return ret;
89 }
90
91 vector<HLSZombie> Server::get_hls_zombies()
92 {
93         vector<HLSZombie> ret;
94
95         timespec now;
96         if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
97                 log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
98                 return ret;
99         }
100
101         lock_guard<mutex> lock(mu);
102         for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
103                 if (is_earlier(it->second.expires, now)) {
104                         hls_zombies.erase(it++);
105                 } else {
106                         ret.push_back(it->second);
107                         ++it;
108                 }
109         }
110         return ret;
111 }
112
113 void Server::do_work()
114 {
115         while (!should_stop()) {
116                 // Wait until there's activity on at least one of the fds,
117                 // or 20 ms (about one frame at 50 fps) has elapsed.
118                 //
119                 // We could in theory wait forever and rely on wakeup()
120                 // from add_client_deferred() and add_data_deferred(),
121                 // but wakeup is a pretty expensive operation, and the
122                 // two threads might end up fighting over a lock, so it's
123                 // seemingly (much) more efficient to just have a timeout here.
124                 int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS, &sigset_without_usr1_block);
125                 if (nfds == -1 && errno != EINTR) {
126                         log_perror("epoll_wait");
127                         exit(1);
128                 }
129
130                 lock_guard<mutex> lock(mu);  // We release the mutex between iterations.
131         
132                 process_queued_data();
133
134                 // Process each client where we have socket activity.
135                 for (int i = 0; i < nfds; ++i) {
136                         Client *client = reinterpret_cast<Client *>(events[i].data.ptr);
137
138                         if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) {
139                                 close_client(client);
140                                 continue;
141                         }
142
143                         process_client(client);
144                 }
145
146                 // Process each client where its stream has new data,
147                 // even if there was no socket activity.
148                 for (unique_ptr<Stream> &stream : streams) {
149                         vector<Client *> to_process;
150                         swap(stream->to_process, to_process);
151                         for (Client *client : to_process) {
152                                 process_client(client);
153                         }
154                 }
155
156                 // Finally, go through each client to see if it's timed out
157                 // in the READING_REQUEST state. (Seemingly there are clients
158                 // that can hold sockets up for days at a time without sending
159                 // anything at all.)
160                 timespec timeout_time;
161                 if (clock_gettime(CLOCK_MONOTONIC_COARSE, &timeout_time) == -1) {
162                         log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
163                         continue;
164                 }
165                 timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC;
166                 while (!clients_ordered_by_connect_time.empty()) {
167                         const pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
168
169                         // See if we have reached the end of clients to process.
170                         if (is_earlier(timeout_time, connect_time_and_fd.first)) {
171                                 break;
172                         }
173
174                         // If this client doesn't exist anymore, just ignore it
175                         // (it was deleted earlier).
176                         auto client_it = clients.find(connect_time_and_fd.second);
177                         if (client_it == clients.end()) {
178                                 clients_ordered_by_connect_time.pop();
179                                 continue;
180                         }
181                         Client *client = &client_it->second;
182                         if (!is_equal(client->connect_time, connect_time_and_fd.first)) {
183                                 // Another client has taken this fd in the meantime.
184                                 clients_ordered_by_connect_time.pop();
185                                 continue;
186                         }
187
188                         if (client->state != Client::READING_REQUEST) {
189                                 // Only READING_REQUEST can time out.
190                                 clients_ordered_by_connect_time.pop();
191                                 continue;
192                         }
193
194                         // OK, it timed out.
195                         close_client(client);
196                         clients_ordered_by_connect_time.pop();
197                 }
198         }
199 }
200
201 CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short_response_pool)
202 {
203         // We don't serialize anything queued, so empty the queues.
204         process_queued_data();
205
206         // Set all clients in a consistent state before serializing
207         // (ie., they have no remaining lost data). Otherwise, increasing
208         // the backlog could take clients into a newly valid area of the backlog,
209         // sending a stream of zeros instead of skipping the data as it should.
210         //
211         // TODO: Do this when clients are added back from serialized state instead;
212         // it would probably be less wasteful.
213         for (auto &fd_and_client : clients) {
214                 skip_lost_data(&fd_and_client.second);
215         }
216
217         CubemapStateProto serialized;
218         for (const auto &fd_and_client : clients) {
219                 serialized.add_clients()->MergeFrom(fd_and_client.second.serialize(short_response_pool));
220         }
221         for (unique_ptr<Stream> &stream : streams) {
222                 serialized.add_streams()->MergeFrom(stream->serialize());
223         }
224         for (const auto &key_and_zombie : hls_zombies) {
225                 HLSZombieProto *proto = serialized.add_hls_zombies();
226                 proto->set_key(key_and_zombie.first);
227
228                 const HLSZombie &zombie = key_and_zombie.second;
229                 proto->set_remote_addr(zombie.remote_addr);
230                 proto->set_url(zombie.url);
231                 proto->set_referer(zombie.referer);
232                 proto->set_user_agent(zombie.user_agent);
233                 proto->set_expires_sec(zombie.expires.tv_sec);
234                 proto->set_expires_nsec(zombie.expires.tv_nsec);
235         }
236         return serialized;
237 }
238
239 void Server::add_client_deferred(int sock, Acceptor *acceptor)
240 {
241         lock_guard<mutex> lock(queued_clients_mutex);
242         queued_add_clients.push_back(std::make_pair(sock, acceptor));
243 }
244
245 void Server::add_client(int sock, Acceptor *acceptor)
246 {
247         const bool is_tls = acceptor->is_tls();
248         auto inserted = clients.insert(make_pair(sock, Client(sock)));
249         assert(inserted.second == true);  // Should not already exist.
250         Client *client_ptr = &inserted.first->second;
251
252         start_client_timeout_timer(client_ptr);
253
254         // Start listening on data from this socket.
255         epoll_event ev;
256         if (is_tls) {
257                 // Even in the initial state (READING_REQUEST), TLS needs to
258                 // send data for the handshake, and thus might end up needing
259                 // to know about EPOLLOUT.
260                 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
261         } else {
262                 // EPOLLOUT will be added once we go out of READING_REQUEST.
263                 ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
264         }
265         ev.data.ptr = client_ptr;
266         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
267                 log_perror("epoll_ctl(EPOLL_CTL_ADD)");
268                 exit(1);
269         }
270
271         if (is_tls) {
272                 assert(tls_server_contexts.count(acceptor));
273                 client_ptr->tls_context = tls_accept(tls_server_contexts[acceptor]);
274                 if (client_ptr->tls_context == nullptr) {
275                         log(ERROR, "tls_accept() failed");
276                         close_client(client_ptr);
277                         return;
278                 }
279                 tls_make_exportable(client_ptr->tls_context, 1);
280         }
281
282         process_client(client_ptr);
283 }
284
285 void Server::add_client_from_serialized(const ClientProto &client, const vector<shared_ptr<const string>> &short_responses)
286 {
287         lock_guard<mutex> lock(mu);
288         Stream *stream;
289         int stream_index = lookup_stream_by_url(client.url());
290         if (stream_index == -1) {
291                 assert(client.state() != Client::SENDING_DATA);
292                 stream = nullptr;
293         } else {
294                 stream = streams[stream_index].get();
295         }
296         auto inserted = clients.insert(make_pair(client.sock(), Client(client, short_responses, stream)));
297         assert(inserted.second == true);  // Should not already exist.
298         Client *client_ptr = &inserted.first->second;
299
300         // Connection timestamps must be nondecreasing.
301         assert(clients_ordered_by_connect_time.empty() ||
302                !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
303         clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, client.sock()));
304
305         // Start listening on data from this socket.
306         epoll_event ev;
307         if (client.state() == Client::READING_REQUEST) {
308                 // See the corresponding comment in Server::add_client().
309                 if (client.has_tls_context()) {
310                         ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
311                 } else {
312                         ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
313                 }
314         } else {
315                 // If we don't have more data for this client, we'll be putting it into
316                 // the sleeping array again soon.
317                 ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
318         }
319         ev.data.ptr = client_ptr;
320         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
321                 log_perror("epoll_ctl(EPOLL_CTL_ADD)");
322                 exit(1);
323         }
324
325         if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
326             client_ptr->state == Client::PREBUFFERING ||
327             (client_ptr->state == Client::SENDING_DATA &&
328              client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
329                 client_ptr->stream->put_client_to_sleep(client_ptr);
330         } else {
331                 process_client(client_ptr);
332         }
333 }
334
335 void Server::start_client_timeout_timer(Client *client)
336 {
337         // Connection timestamps must be nondecreasing. I can't find any guarantee
338         // that even the monotonic clock can't go backwards by a small amount
339         // (think switching between CPUs with non-synchronized TSCs), so if
340         // this actually should happen, we hack around it by fudging
341         // connect_time.
342         if (clock_gettime(CLOCK_MONOTONIC_COARSE, &client->connect_time) == -1) {
343                 log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
344         } else {
345                 if (!clients_ordered_by_connect_time.empty() &&
346                     is_earlier(client->connect_time, clients_ordered_by_connect_time.back().first)) {
347                         client->connect_time = clients_ordered_by_connect_time.back().first;
348                 }
349                 clients_ordered_by_connect_time.push(make_pair(client->connect_time, client->sock));
350         }
351 }
352
353 int Server::lookup_stream_by_url(const string &url) const
354 {
355         const auto stream_url_it = stream_url_map.find(url);
356         if (stream_url_it == stream_url_map.end()) {
357                 return -1;
358         }
359         return stream_url_it->second;
360 }
361
362 int Server::add_stream(const string &url,
363                        const string &hls_url,
364                        size_t backlog_size,
365                        size_t prebuffering_bytes,
366                        Stream::Encoding encoding,
367                        Stream::Encoding src_encoding,
368                        unsigned hls_frag_duration,
369                        size_t hls_backlog_margin,
370                        const string &allow_origin)
371 {
372         lock_guard<mutex> lock(mu);
373         stream_url_map.insert(make_pair(url, streams.size()));
374         if (!hls_url.empty()) {
375                 stream_hls_url_map.insert(make_pair(hls_url, streams.size()));
376         }
377         streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin));
378         return streams.size() - 1;
379 }
380
381 int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
382 {
383         lock_guard<mutex> lock(mu);
384         stream_url_map.insert(make_pair(stream.url(), streams.size()));
385         // stream_hls_url_map will be updated in register_hls_url(), since it is not part
386         // of the serialized state (it will always be picked out from the configuration).
387         streams.emplace_back(new Stream(stream, data_fd));
388         return streams.size() - 1;
389 }
390
391 void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
392 {
393         lock_guard<mutex> lock(mu);
394         HLSZombie zombie;
395         zombie.remote_addr = zombie_proto.remote_addr();
396         zombie.url = zombie_proto.url();
397         zombie.referer = zombie_proto.referer();
398         zombie.user_agent = zombie_proto.user_agent();
399         zombie.expires.tv_sec = zombie_proto.expires_sec();
400         zombie.expires.tv_nsec = zombie_proto.expires_nsec();
401         hls_zombies[zombie_proto.key()] = move(zombie);
402 }
403
404 void Server::set_backlog_size(int stream_index, size_t new_size)
405 {
406         lock_guard<mutex> lock(mu);
407         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
408         streams[stream_index]->set_backlog_size(new_size);
409 }
410
411 void Server::set_prebuffering_bytes(int stream_index, size_t new_amount)
412 {
413         lock_guard<mutex> lock(mu);
414         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
415         streams[stream_index]->prebuffering_bytes = new_amount;
416 }
417         
418 void Server::set_encoding(int stream_index, Stream::Encoding encoding)
419 {
420         lock_guard<mutex> lock(mu);
421         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
422         streams[stream_index]->encoding = encoding;
423 }
424
425 void Server::set_src_encoding(int stream_index, Stream::Encoding encoding)
426 {
427         lock_guard<mutex> lock(mu);
428         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
429         streams[stream_index]->src_encoding = encoding;
430 }
431
432 void Server::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
433 {
434         lock_guard<mutex> lock(mu);
435         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
436         streams[stream_index]->hls_frag_duration = hls_frag_duration;
437 }
438
439 void Server::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
440 {
441         lock_guard<mutex> lock(mu);
442         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
443         assert(hls_backlog_margin < streams[stream_index]->backlog_size);
444         streams[stream_index]->hls_backlog_margin = hls_backlog_margin;
445 }
446
447 void Server::set_allow_origin(int stream_index, const string &allow_origin)
448 {
449         lock_guard<mutex> lock(mu);
450         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
451         streams[stream_index]->allow_origin = allow_origin;
452 }
453
454 void Server::register_hls_url(int stream_index, const string &hls_url)
455 {
456         lock_guard<mutex> lock(mu);
457         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
458         assert(!hls_url.empty());
459         stream_hls_url_map.insert(make_pair(hls_url, stream_index));
460 }
461         
462 void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
463 {
464         lock_guard<mutex> lock(mu);
465         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
466         streams[stream_index]->set_header(http_header, stream_header);
467 }
468
469 void Server::set_unavailable(int stream_index)
470 {
471         lock_guard<mutex> lock(mu);
472         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
473         streams[stream_index]->set_unavailable();
474 }
475         
476 void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
477 {
478         lock_guard<mutex> lock(mu);
479         assert(clients.empty());
480         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
481         streams[stream_index]->pacing_rate = pacing_rate;
482 }
483
484 void Server::add_gen204(const std::string &url, const std::string &allow_origin)
485 {
486         lock_guard<mutex> lock(mu);
487         assert(clients.empty());
488         ping_url_map[url] = allow_origin;
489 }
490
491 void Server::create_tls_context_for_acceptor(const Acceptor *acceptor)
492 {
493         assert(acceptor->is_tls());
494
495         bool is_server = true;
496         TLSContext *server_context = tls_create_context(is_server, TLS_V12);
497
498         const string &cert = acceptor->get_certificate_chain();
499         int num_cert = tls_load_certificates(server_context, reinterpret_cast<const unsigned char *>(cert.data()), cert.size());
500         assert(num_cert > 0);  // Should have been checked by config earlier.
501
502         const string &key = acceptor->get_private_key();
503         int num_key = tls_load_private_key(server_context, reinterpret_cast<const unsigned char *>(key.data()), key.size());
504         assert(num_key > 0);  // Should have been checked by config earlier.
505
506         tls_server_contexts.insert(make_pair(acceptor, server_context));
507 }
508
509 void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
510 {
511         assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
512         streams[stream_index]->add_data_deferred(data, bytes, metacube_flags, pts);
513 }
514
515 // See the .h file for postconditions after this function.      
516 void Server::process_client(Client *client)
517 {
518         switch (client->state) {
519         case Client::READING_REQUEST: {
520                 if (client->tls_context != nullptr && !client->in_ktls_mode) {
521                         if (send_pending_tls_data(client)) {
522                                 // send_pending_tls_data() hit postconditions #1 or #4.
523                                 return;
524                         }
525                 }
526
527 read_request_again:
528                 // Try to read more of the request.
529                 char buf[1024];
530                 int ret;
531                 if (client->tls_context == nullptr || client->in_ktls_mode) {
532                         ret = read_plain_data(client, buf, sizeof(buf));
533                         if (ret == -1) {
534                                 // read_plain_data() hit postconditions #1 or #2.
535                                 return;
536                         }
537                 } else {
538                         ret = read_tls_data(client, buf, sizeof(buf));
539                         if (ret == -1) {
540                                 // read_tls_data() hit postconditions #1, #2 or #4.
541                                 return;
542                         }
543                 }
544
545                 RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret);
546         
547                 switch (status) {
548                 case RP_OUT_OF_SPACE:
549                         log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str());
550                         close_client(client);
551                         return;
552                 case RP_NOT_FINISHED_YET:
553                         // OK, we don't have the entire header yet. Fine; we'll get it later.
554                         // See if there's more data for us.
555                         goto read_request_again;
556                 case RP_EXTRA_DATA:
557                         log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str());
558                         close_client(client);
559                         return;
560                 case RP_FINISHED:
561                         break;
562                 }
563
564                 assert(status == RP_FINISHED);
565
566                 int error_code = parse_request(client);
567                 if (error_code == 200) {
568                         if (client->serving_hls_playlist) {
569                                 construct_hls_playlist(client);
570                         } else {
571                                 construct_stream_header(client);
572                         }
573                 } else if (error_code == 204) {
574                         construct_204(client);
575                 } else {
576                         construct_error(client, error_code);
577                 }
578
579                 // We've changed states, so fall through.
580                 assert(client->state == Client::SENDING_SHORT_RESPONSE ||
581                        client->state == Client::SENDING_HEADER);
582         }
583         case Client::SENDING_SHORT_RESPONSE:
584         case Client::SENDING_HEADER: {
585 sending_header_or_short_response_again:
586                 int ret;
587                 do {
588                         ret = write(client->sock,
589                                     client->header_or_short_response->data() + client->header_or_short_response_bytes_sent,
590                                     client->header_or_short_response->size() - client->header_or_short_response_bytes_sent);
591                 } while (ret == -1 && errno == EINTR);
592
593                 if (ret == -1 && errno == EAGAIN) {
594                         // We're out of socket space, so now we're at the “low edge” of epoll's
595                         // edge triggering. epoll will tell us when there is more room, so for now,
596                         // just return.
597                         // This is postcondition #4.
598                         return;
599                 }
600
601                 if (ret == -1) {
602                         // Error! Postcondition #1.
603                         log_perror("write");
604                         close_client(client);
605                         return;
606                 }
607                 
608                 client->header_or_short_response_bytes_sent += ret;
609                 assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response->size());
610
611                 if (client->header_or_short_response_bytes_sent < client->header_or_short_response->size()) {
612                         // We haven't sent all yet. Fine; go another round.
613                         goto sending_header_or_short_response_again;
614                 }
615
616                 // We're done sending the header or error! Clear it to release some memory.
617                 client->header_or_short_response = nullptr;
618                 client->header_or_short_response_holder.clear();
619                 client->header_or_short_response_ref.reset();
620
621                 if (client->state == Client::SENDING_SHORT_RESPONSE) {
622                         if (more_requests(client)) {
623                                 // We're done sending the error, but should keep on reading new requests.
624                                 goto read_request_again;
625                         } else {
626                                 // We're done sending the error, so now close.
627                                 // This is postcondition #1.
628                                 close_client(client);
629                         }
630                         return;
631                 }
632
633                 Stream *stream = client->stream;
634                 hls_zombies.erase(client->get_hls_zombie_key());
635                 if (client->stream_pos == Client::STREAM_POS_AT_START) {
636                         // Start sending from the beginning of the backlog.
637                         client->stream_pos = min<size_t>(
638                             stream->bytes_received - stream->backlog_size,
639                             0);
640                         client->state = Client::SENDING_DATA;
641                         goto sending_data;
642                 } else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
643                         // We're sending a fragment, and should have all of it,
644                         // so start sending right away.
645                         assert(ssize_t(client->stream_pos) >= 0);
646                         client->state = Client::SENDING_DATA;
647                         goto sending_data;
648                 } else if (stream->prebuffering_bytes == 0) {
649                         // Start sending from the first keyframe we get. In other
650                         // words, we won't send any of the backlog, but we'll start
651                         // sending immediately as we get the next keyframe block.
652                         // Note that this is functionally identical to the next if branch,
653                         // except that we save a binary search.
654                         assert(client->stream_pos == Client::STREAM_POS_AT_END);
655                         assert(client->stream_pos_end == Client::STREAM_POS_NO_END);
656                         client->stream_pos = stream->bytes_received;
657                         client->state = Client::WAITING_FOR_KEYFRAME;
658                 } else {
659                         // We're not going to send anything to the client before we have
660                         // N bytes. However, this wait might be boring; we can just as well
661                         // use it to send older data if we have it. We use lower_bound()
662                         // so that we are conservative and never add extra latency over just
663                         // waiting (assuming CBR or nearly so); otherwise, we could want e.g.
664                         // 100 kB prebuffer but end up sending a 10 MB GOP.
665                         assert(client->stream_pos == Client::STREAM_POS_AT_END);
666                         assert(client->stream_pos_end == Client::STREAM_POS_NO_END);
667                         deque<uint64_t>::const_iterator starting_point_it =
668                                 lower_bound(stream->suitable_starting_points.begin(),
669                                             stream->suitable_starting_points.end(),
670                                             stream->bytes_received - stream->prebuffering_bytes);
671                         if (starting_point_it == stream->suitable_starting_points.end()) {
672                                 // None found. Just put us at the end, and then wait for the
673                                 // first keyframe to appear.
674                                 client->stream_pos = stream->bytes_received;
675                                 client->state = Client::WAITING_FOR_KEYFRAME;
676                         } else {
677                                 client->stream_pos = *starting_point_it;
678                                 client->state = Client::PREBUFFERING;
679                                 goto prebuffering;
680                         }
681                 }
682                 // Fall through.
683         }
684         case Client::WAITING_FOR_KEYFRAME: {
685                 Stream *stream = client->stream;
686                 if (stream->suitable_starting_points.empty() ||
687                     client->stream_pos > stream->suitable_starting_points.back()) {
688                         // We haven't received a keyframe since this stream started waiting,
689                         // so keep on waiting for one.
690                         // This is postcondition #3.
691                         stream->put_client_to_sleep(client);
692                         return;
693                 }
694                 client->stream_pos = stream->suitable_starting_points.back();
695                 client->state = Client::PREBUFFERING;
696                 // Fall through.
697         }
698         case Client::PREBUFFERING: {
699 prebuffering:
700                 Stream *stream = client->stream;
701                 size_t bytes_to_send = stream->bytes_received - client->stream_pos;
702                 assert(bytes_to_send <= stream->backlog_size);
703                 if (bytes_to_send < stream->prebuffering_bytes) {
704                         // We don't have enough bytes buffered to start this client yet.
705                         // This is postcondition #3.
706                         stream->put_client_to_sleep(client);
707                         return;
708                 }
709                 client->state = Client::SENDING_DATA;
710                 // Fall through.
711         }
712         case Client::SENDING_DATA: {
713 sending_data:
714                 skip_lost_data(client);
715                 Stream *stream = client->stream;
716
717 sending_data_again:
718                 size_t bytes_to_send;
719                 if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
720                          bytes_to_send = stream->bytes_received - client->stream_pos;
721                 } else {
722                          bytes_to_send = client->stream_pos_end - client->stream_pos;
723                 }
724                 assert(bytes_to_send <= stream->backlog_size);
725                 if (bytes_to_send == 0) {
726                         if (client->stream_pos == client->stream_pos_end) {  // We have a definite end, and we're at it.
727                                 // Add (or overwrite) a HLS zombie.
728                                 timespec now;
729                                 if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
730                                         log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
731                                 } else {
732                                         HLSZombie zombie;
733                                         zombie.remote_addr = client->remote_addr;
734                                         zombie.referer = client->referer;
735                                         zombie.user_agent = client->user_agent;
736                                         zombie.url = client->stream->url + "?frag=<idle>";
737                                         zombie.expires = now;
738                                         zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
739                                         hls_zombies[client->get_hls_zombie_key()] = move(zombie);
740                                 }
741                                 if (more_requests(client)) {
742                                         // We're done sending the fragment, but should keep on reading new requests.
743                                         goto read_request_again;
744                                 } else {
745                                         // We're done sending the fragment, so now close.
746                                         // This is postcondition #1.
747                                         close_client(client);
748                                 }
749                         }
750                         return;
751                 }
752
753                 // See if we need to split across the circular buffer.
754                 bool more_data = false;
755                 if ((client->stream_pos % stream->backlog_size) + bytes_to_send > stream->backlog_size) {
756                         bytes_to_send = stream->backlog_size - (client->stream_pos % stream->backlog_size);
757                         more_data = true;
758                 }
759
760                 ssize_t ret;
761                 do {
762                         off_t offset = client->stream_pos % stream->backlog_size;
763                         ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send);
764                 } while (ret == -1 && errno == EINTR);
765
766                 if (ret == -1 && errno == EAGAIN) {
767                         // We're out of socket space, so return; epoll will wake us up
768                         // when there is more room.
769                         // This is postcondition #4.
770                         return;
771                 }
772                 if (ret == -1) {
773                         // Error, close; postcondition #1.
774                         log_perror("sendfile");
775                         close_client(client);
776                         return;
777                 }
778                 client->stream_pos += ret;
779                 client->bytes_sent += ret;
780
781                 assert(client->stream_pos_end == Client::STREAM_POS_NO_END || client->stream_pos <= client->stream_pos_end);
782                 if (client->stream_pos == client->stream_pos_end) {
783                         goto sending_data_again;  // Will see that bytes_to_send == 0 and end.
784                 } else if (client->stream_pos == stream->bytes_received) {
785                         // We don't have any more data for this client, so put it to sleep.
786                         // This is postcondition #3.
787                         stream->put_client_to_sleep(client);
788                 } else if (more_data && size_t(ret) == bytes_to_send) {
789                         goto sending_data_again;
790                 }
791                 // We'll also get here for postcondition #4 (similar to the EAGAIN path above).
792                 break;
793         }
794         default:
795                 assert(false);
796         }
797 }
798
799 namespace {
800
801 void flush_pending_data(int sock)
802 {
803         // Flush pending data, which would otherwise wait for the 200ms TCP_CORK timer
804         // to elapsed; does not cancel out TCP_CORK (since that still takes priority),
805         // but does a one-off flush.
806         int one = 1;
807         if (setsockopt(sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) == -1) {
808                 log_perror("setsockopt(TCP_NODELAY)");
809                 // Can still continue.
810         }
811 }
812
813 }  // namespace
814
815 bool Server::send_pending_tls_data(Client *client)
816 {
817         // See if there's data from the TLS library to write.
818         if (client->tls_data_to_send == nullptr) {
819                 client->tls_data_to_send = tls_get_write_buffer(client->tls_context, &client->tls_data_left_to_send);
820                 if (client->tls_data_to_send == nullptr) {
821                         // Really no data to send.
822                         return false;
823                 }
824         }
825
826 send_data_again:
827         int ret;
828         do {
829                 ret = write(client->sock, client->tls_data_to_send, client->tls_data_left_to_send);
830         } while (ret == -1 && errno == EINTR);
831         assert(ret < 0 || size_t(ret) <= client->tls_data_left_to_send);
832
833         if (ret == -1 && errno == EAGAIN) {
834                 // We're out of socket space, so now we're at the “low edge” of epoll's
835                 // edge triggering. epoll will tell us when there is more room, so for now,
836                 // just return.
837                 // This is postcondition #4.
838                 return true;
839         }
840         if (ret == -1) {
841                 // Error! Postcondition #1.
842                 log_perror("write");
843                 close_client(client);
844                 return true;
845         }
846         if (ret > 0 && size_t(ret) == client->tls_data_left_to_send) {
847                 // All data has been sent, so we don't need to go to sleep
848                 // (although we are likely to do so immediately afterwards,
849                 // due to lack of client data).
850                 tls_buffer_clear(client->tls_context);
851                 client->tls_data_to_send = nullptr;
852
853                 // Flush the data we just wrote, since the client probably
854                 // is waiting for it.
855                 flush_pending_data(client->sock);
856                 return false;
857         }
858
859         // More data to send, so try again.
860         client->tls_data_to_send += ret;
861         client->tls_data_left_to_send -= ret;
862         goto send_data_again;
863 }
864
865 int Server::read_plain_data(Client *client, char *buf, size_t max_size)
866 {
867         int ret;
868         do {
869                 ret = read(client->sock, buf, max_size);
870         } while (ret == -1 && errno == EINTR);
871
872         if (ret == -1 && errno == EAGAIN) {
873                 // No more data right now. Nothing to do.
874                 // This is postcondition #2.
875                 return -1;
876         }
877         if (ret == -1) {
878                 log_perror("read");
879                 close_client(client);
880                 return -1;
881         }
882         if (ret == 0) {
883                 // OK, the socket is closed.
884                 close_client(client);
885                 return -1;
886         }
887
888         return ret;
889 }
890
891 int Server::read_tls_data(Client *client, char *buf, size_t max_size)
892 {
893 read_again:
894         assert(!client->in_ktls_mode);
895
896         int ret;
897         do {
898                 ret = read(client->sock, buf, max_size);
899         } while (ret == -1 && errno == EINTR);
900
901         if (ret == -1 && errno == EAGAIN) {
902                 // No more data right now. Nothing to do.
903                 // This is postcondition #2.
904                 return -1;
905         }
906         if (ret == -1) {
907                 log_perror("read");
908                 close_client(client);
909                 return -1;
910         }
911         if (ret == 0) {
912                 // OK, the socket is closed.
913                 close_client(client);
914                 return -1;
915         }
916
917         // Give it to the TLS library.
918         int err = tls_consume_stream(client->tls_context, reinterpret_cast<const unsigned char *>(buf), ret, nullptr);
919         if (err < 0) {
920                 log_tls_error("tls_consume_stream", err);
921                 close_client(client);
922                 return -1;
923         }
924         if (err == 0) {
925                 // Not consumed any data. See if we can read more.
926                 goto read_again;
927         }
928
929         // Read any decrypted data available for us. (We can reuse buf, since it's free now.)
930         ret = tls_read(client->tls_context, reinterpret_cast<unsigned char *>(buf), max_size);
931         if (ret == 0) {
932                 // No decrypted data for us yet, but there might be some more handshaking
933                 // to send. Do that if needed, then look for more data.
934                 if (send_pending_tls_data(client)) {
935                         // send_pending_tls_data() hit postconditions #1 or #4.
936                         return -1;
937                 }
938                 goto read_again;
939         }
940         if (ret < 0) {
941                 log_tls_error("tls_read", ret);
942                 close_client(client);
943                 return -1;
944         }
945
946         if (tls_established(client->tls_context)) {
947                 // We're ready to enter kTLS mode, unless we still have some
948                 // handshake data to send (which then must be sent as non-kTLS).
949                 if (send_pending_tls_data(client)) {
950                         // send_pending_tls_data() hit postconditions #1 or #4.
951                         return -1;
952                 }
953                 int err = tls_make_ktls(client->tls_context, client->sock);  // Don't overwrite ret.
954                 if (err < 0) {
955                         log_tls_error("tls_make_ktls", ret);
956                         close_client(client);
957                         return -1;
958                 }
959                 client->in_ktls_mode = true;
960         }
961
962         assert(ret > 0);
963         return ret;
964 }
965
966 // See if there's some data we've lost. Ideally, we should drop to a block boundary,
967 // but resync will be the mux's problem.
968 void Server::skip_lost_data(Client *client)
969 {
970         Stream *stream = client->stream;
971         if (stream == nullptr) {
972                 return;
973         }
974         size_t bytes_to_send = stream->bytes_received - client->stream_pos;
975         if (bytes_to_send > stream->backlog_size) {
976                 size_t bytes_lost = bytes_to_send - stream->backlog_size;
977                 client->bytes_lost += bytes_lost;
978                 ++client->num_loss_events;
979                 if (!client->close_after_response) {
980                         assert(client->stream_pos_end != Client::STREAM_POS_NO_END);
981
982                         // We've already sent a Content-Length, so we can't just skip data.
983                         // Close the connection immediately and hope the other side
984                         // is able to figure out that there was an error and it needs to skip.
985                         client->close_after_response = true;
986                         client->stream_pos = client->stream_pos_end;
987                 } else {
988                         client->stream_pos = stream->bytes_received - stream->backlog_size;
989                 }
990         }
991 }
992
993 int Server::parse_request(Client *client)
994 {
995         vector<string> lines = split_lines(client->request);
996         client->request.clear();
997         if (lines.empty()) {
998                 return 400;  // Bad request (empty).
999         }
1000
1001         // Parse the headers, for logging purposes.
1002         HTTPHeaderMultimap headers = extract_headers(lines, client->remote_addr);
1003         const auto referer_it = headers.find("Referer");
1004         if (referer_it != headers.end()) {
1005                 client->referer = referer_it->second;
1006         }
1007         const auto user_agent_it = headers.find("User-Agent");
1008         if (user_agent_it != headers.end()) {
1009                 client->user_agent = user_agent_it->second;
1010         }
1011         const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
1012         if (x_playback_session_id_it != headers.end()) {
1013                 client->x_playback_session_id = x_playback_session_id_it->second;
1014         } else {
1015                 client->x_playback_session_id.clear();
1016         }
1017
1018         vector<string> request_tokens = split_tokens(lines[0]);
1019         if (request_tokens.size() < 3) {
1020                 return 400;  // Bad request (empty).
1021         }
1022         if (request_tokens[0] != "GET") {
1023                 return 400;  // Should maybe be 405 instead?
1024         }
1025
1026         string url = request_tokens[1];
1027         client->url = url;
1028         if (url.size() > 8 && url.find("?backlog") == url.size() - 8) {
1029                 client->stream_pos = Client::STREAM_POS_AT_START;
1030                 url = url.substr(0, url.size() - 8);
1031         } else {
1032                 size_t pos = url.find("?frag=");
1033                 if (pos != string::npos) {
1034                         // Parse an endpoint of the type /stream.mp4?frag=1234-5678.
1035                         const char *ptr = url.c_str() + pos + 6;
1036
1037                         // "?frag=header" is special.
1038                         if (strcmp(ptr, "header") == 0) {
1039                                 client->stream_pos = Client::STREAM_POS_HEADER_ONLY;
1040                                 client->stream_pos_end = -1;
1041                         } else {
1042                                 char *endptr;
1043                                 long long frag_start = strtol(ptr, &endptr, 10);
1044                                 if (ptr == endptr || frag_start < 0 || frag_start == LLONG_MAX) {
1045                                         return 400;  // Bad request.
1046                                 }
1047                                 if (*endptr != '-') {
1048                                         return 400;  // Bad request.
1049                                 }
1050                                 ptr = endptr + 1;
1051
1052                                 long long frag_end = strtol(ptr, &endptr, 10);
1053                                 if (ptr == endptr || frag_end < frag_start || frag_end == LLONG_MAX) {
1054                                         return 400;  // Bad request.
1055                                 }
1056
1057                                 if (*endptr != '\0') {
1058                                         return 400;  // Bad request.
1059                                 }
1060
1061                                 client->stream_pos = frag_start;
1062                                 client->stream_pos_end = frag_end;
1063                         }
1064                         url = url.substr(0, pos);
1065                 } else {
1066                         client->stream_pos = -1;
1067                         client->stream_pos_end = -1;
1068                 }
1069         }
1070
1071         // Figure out if we're supposed to close the socket after we've delivered the response.
1072         string protocol = request_tokens[2];
1073         if (protocol.find("HTTP/") != 0) {
1074                 return 400;  // Bad request.
1075         }
1076         client->close_after_response = false;
1077         client->http_11 = true;
1078         if (protocol == "HTTP/1.0") {
1079                 // No persistent connections.
1080                 client->close_after_response = true;
1081                 client->http_11 = false;
1082         } else {
1083                 const auto connection_it = headers.find("Connection");
1084                 if (connection_it != headers.end() && connection_it->second == "close") {
1085                         client->close_after_response = true;
1086                 }
1087         }
1088
1089         const auto stream_url_map_it = stream_url_map.find(url);
1090         if (stream_url_map_it != stream_url_map.end()) {
1091                 // Serve a regular stream..
1092                 client->stream = streams[stream_url_map_it->second].get();
1093                 client->serving_hls_playlist = false;
1094         } else {
1095                 const auto stream_hls_url_map_it = stream_hls_url_map.find(url);
1096                 if (stream_hls_url_map_it != stream_hls_url_map.end()) {
1097                         // Serve HLS playlist.
1098                         client->stream = streams[stream_hls_url_map_it->second].get();
1099                         client->serving_hls_playlist = true;
1100                 } else {
1101                         const auto ping_url_map_it = ping_url_map.find(url);
1102                         if (ping_url_map_it == ping_url_map.end()) {
1103                                 return 404;  // Not found.
1104                         } else {
1105                                 // Serve a ping (204 no error).
1106                                 return 204;
1107                         }
1108                 }
1109         }
1110
1111         Stream *stream = client->stream;
1112
1113         if (client->serving_hls_playlist) {
1114                 if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
1115                         // This doesn't make any sense, and is hard to implement, too.
1116                         return 404;
1117                 } else {
1118                         return 200;
1119                 }
1120         }
1121
1122         if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
1123                 if (stream->unavailable) {
1124                         return 503;  // Service unavailable.
1125                 }
1126
1127                 // This stream won't end, so we don't have a content-length,
1128                 // and can just as well tell the client it's Connection: close
1129                 // (otherwise, we'd have to implement chunking TE for no good reason).
1130                 client->close_after_response = true;
1131         } else {
1132                 if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
1133                         // This doesn't make any sense, and is hard to implement, too.
1134                         return 416;  // Range not satisfiable.
1135                 }
1136
1137                 // Check that we have the requested fragment in our backlog.
1138                 size_t buffer_end = stream->bytes_received;
1139                 size_t buffer_start = (buffer_end <= stream->backlog_size) ? 0 : buffer_end - stream->backlog_size;
1140
1141                 if (client->stream_pos_end > buffer_end ||
1142                     client->stream_pos < buffer_start) {
1143                         return 416;  // Range not satisfiable.
1144                 }
1145         }
1146
1147         client->stream = stream;
1148         if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
1149                 if (client->stream->pacing_rate != ~0U) {
1150                         log_perror("setsockopt(SO_MAX_PACING_RATE)");
1151                 }
1152         }
1153         client->request.clear();
1154
1155         return 200;  // OK!
1156 }
1157
1158 void Server::construct_stream_header(Client *client)
1159 {
1160         Stream *stream = client->stream;
1161         string response = stream->http_header;
1162         if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
1163                 char buf[64];
1164                 snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n", stream->stream_header.size());
1165                 response.append(buf);
1166         } else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
1167                 char buf[64];
1168                 snprintf(buf, sizeof(buf), "Content-Length: %" PRIu64 "\r\n", client->stream_pos_end - client->stream_pos);
1169                 response.append(buf);
1170         }
1171         if (client->http_11) {
1172                 assert(response.find("HTTP/1.0") == 0);
1173                 response[7] = '1';  // Change to HTTP/1.1.
1174                 if (client->close_after_response) {
1175                         response.append("Connection: close\r\n");
1176                 }
1177         } else {
1178                 assert(client->close_after_response);
1179         }
1180         if (!stream->allow_origin.empty()) {
1181                 response.append("Access-Control-Allow-Origin: ");
1182                 response.append(stream->allow_origin);
1183                 response.append("\r\n");
1184         }
1185         if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
1186                 response.append("\r\n");
1187         } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
1188                 response.append("Content-Encoding: metacube\r\n\r\n");
1189                 if (!stream->stream_header.empty()) {
1190                         metacube2_block_header hdr;
1191                         memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
1192                         hdr.size = htonl(stream->stream_header.size());
1193                         hdr.flags = htons(METACUBE_FLAGS_HEADER);
1194                         hdr.csum = htons(metacube2_compute_crc(&hdr));
1195                         response.append(string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
1196                 }
1197         } else {
1198                 assert(false);
1199         }
1200         if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
1201                 client->state = Client::SENDING_SHORT_RESPONSE;
1202                 response.append(stream->stream_header);
1203         } else {
1204                 client->state = Client::SENDING_HEADER;
1205                 if (client->stream_pos_end == Client::STREAM_POS_NO_END) {  // Fragments don't contain stream headers.
1206                         response.append(stream->stream_header);
1207                 }
1208         }
1209
1210         client->header_or_short_response_holder = move(response);
1211         client->header_or_short_response = &client->header_or_short_response_holder;
1212
1213         // Switch states.
1214         change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
1215 }
1216         
1217 void Server::construct_error(Client *client, int error_code)
1218 {
1219         char error[256];
1220         if (client->http_11 && client->close_after_response) {
1221                 snprintf(error, sizeof(error),
1222                         "HTTP/1.1 %d Error\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nSomething went wrong. Sorry.\r\n",
1223                         error_code);
1224         } else {
1225                 snprintf(error, sizeof(error),
1226                         "HTTP/1.%d %d Error\r\nContent-Type: text/plain\r\nContent-Length: 30\r\n\r\nSomething went wrong. Sorry.\r\n",
1227                         client->http_11, error_code);
1228         }
1229         client->header_or_short_response_holder = error;
1230         client->header_or_short_response = &client->header_or_short_response_holder;
1231
1232         // Switch states.
1233         client->state = Client::SENDING_SHORT_RESPONSE;
1234         change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
1235 }
1236
1237 void Server::construct_hls_playlist(Client *client)
1238 {
1239         Stream *stream = client->stream;
1240         shared_ptr<const string> *cache;
1241         if (client->http_11) {
1242                 if (client->close_after_response) {
1243                         cache = &stream->hls_playlist_http11_close;
1244                 } else {
1245                         cache = &stream->hls_playlist_http11_persistent;
1246                 }
1247         } else {
1248                 assert(client->close_after_response);
1249                 cache = &stream->hls_playlist_http10;
1250         }
1251
1252         if (*cache == nullptr) {
1253                 *cache = stream->generate_hls_playlist(client->http_11, client->close_after_response);
1254         }
1255         client->header_or_short_response_ref = *cache;
1256         client->header_or_short_response = cache->get();
1257
1258         // Switch states.
1259         client->state = Client::SENDING_SHORT_RESPONSE;
1260         change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
1261 }
1262
1263 void Server::construct_204(Client *client)
1264 {
1265         const auto ping_url_map_it = ping_url_map.find(client->url);
1266         assert(ping_url_map_it != ping_url_map.end());
1267
1268         string response;
1269         if (client->http_11) {
1270                 response = "HTTP/1.1 204 No Content\r\n";
1271                 if (client->close_after_response) {
1272                         response.append("Connection: close\r\n");
1273                 }
1274         } else {
1275                 response = "HTTP/1.0 204 No Content\r\n";
1276                 assert(client->close_after_response);
1277         }
1278         if (!ping_url_map_it->second.empty()) {
1279                 response.append("Access-Control-Allow-Origin: ");
1280                 response.append(ping_url_map_it->second);
1281                 response.append("\r\n");
1282         }
1283         response.append("\r\n");
1284
1285         client->header_or_short_response_holder = move(response);
1286         client->header_or_short_response = &client->header_or_short_response_holder;
1287
1288         // Switch states.
1289         client->state = Client::SENDING_SHORT_RESPONSE;
1290         change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
1291 }
1292
1293 namespace {
1294
1295 template<class T>
1296 void delete_from(vector<T> *v, T elem)
1297 {
1298         typename vector<T>::iterator new_end = remove(v->begin(), v->end(), elem);
1299         v->erase(new_end, v->end());
1300 }
1301
1302 void send_ktls_close(int sock)
1303 {
1304         uint8_t record_type = 21;  // Alert.
1305         uint8_t body[] = {
1306                 1,   // Warning level (but still fatal!).
1307                 0,   // close_notify.
1308         };
1309
1310         int cmsg_len = sizeof(record_type);
1311         char buf[CMSG_SPACE(cmsg_len)];
1312
1313         msghdr msg = {0};
1314         msg.msg_control = buf;
1315         msg.msg_controllen = sizeof(buf);
1316         cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
1317         cmsg->cmsg_level = SOL_TLS;
1318         cmsg->cmsg_type = TLS_SET_RECORD_TYPE;
1319         cmsg->cmsg_len = CMSG_LEN(cmsg_len);
1320         *CMSG_DATA(cmsg) = record_type;
1321         msg.msg_controllen = cmsg->cmsg_len;
1322
1323         iovec msg_iov;
1324         msg_iov.iov_base = body;
1325         msg_iov.iov_len = sizeof(body);
1326         msg.msg_iov = &msg_iov;
1327         msg.msg_iovlen = 1;
1328
1329         int err;
1330         do {
1331                 err = sendmsg(sock, &msg, 0);
1332         } while (err == -1 && errno == EINTR);  // Ignore all other errors.
1333 }
1334
1335 }  // namespace
1336         
1337 void Server::close_client(Client *client)
1338 {
1339         if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, nullptr) == -1) {
1340                 log_perror("epoll_ctl(EPOLL_CTL_DEL)");
1341                 exit(1);
1342         }
1343
1344         // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
1345         if (client->stream != nullptr) {
1346                 delete_from(&client->stream->sleeping_clients, client);
1347                 delete_from(&client->stream->to_process, client);
1348         }
1349
1350         if (client->tls_context) {
1351                 if (client->in_ktls_mode) {
1352                         // Keep GnuTLS happy.
1353                         send_ktls_close(client->sock);
1354                 }
1355                 tls_destroy_context(client->tls_context);
1356         }
1357
1358         // Log to access_log.
1359         access_log->write(client->get_stats());
1360
1361         // Bye-bye!
1362         safe_close(client->sock);
1363
1364         clients.erase(client->sock);
1365 }
1366
1367 void Server::change_epoll_events(Client *client, uint32_t events)
1368 {
1369         epoll_event ev;
1370         ev.events = events;
1371         ev.data.ptr = client;
1372
1373         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
1374                 log_perror("epoll_ctl(EPOLL_CTL_MOD)");
1375                 exit(1);
1376         }
1377 }
1378
1379 bool Server::more_requests(Client *client)
1380 {
1381         if (client->close_after_response) {
1382                 return false;
1383         }
1384
1385         // Log to access_log.
1386         access_log->write(client->get_stats());
1387
1388         flush_pending_data(client->sock);
1389
1390         // Switch states and reset the parsers. We don't reset statistics.
1391         client->state = Client::READING_REQUEST;
1392         client->url.clear();
1393         client->stream = NULL;
1394         client->header_or_short_response = nullptr;
1395         client->header_or_short_response_holder.clear();
1396         client->header_or_short_response_ref.reset();
1397         client->header_or_short_response_bytes_sent = 0;
1398         client->bytes_sent = 0;
1399         start_client_timeout_timer(client);
1400
1401         change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP);  // No TLS handshake, so no EPOLLOUT needed.
1402
1403         return true;
1404 }
1405
1406 void Server::process_queued_data()
1407 {
1408         {
1409                 lock_guard<mutex> lock(queued_clients_mutex);
1410
1411                 for (const pair<int, Acceptor *> &id_and_acceptor : queued_add_clients) {
1412                         add_client(id_and_acceptor.first, id_and_acceptor.second);
1413                 }
1414                 queued_add_clients.clear();
1415         }
1416
1417         for (unique_ptr<Stream> &stream : streams) {
1418                 stream->process_queued_data();
1419         }
1420 }