X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=35cb9d6bce869b354f73b96f1824e004cf1835ec;hp=78273b00eb7f3b9b49938c376eba4aeb92fe4300;hb=HEAD;hpb=f2e7dbf218365e3f47b942ea999796b2724ccc24 diff --git a/server.cpp b/server.cpp index 78273b0..8025fd1 100644 --- a/server.cpp +++ b/server.cpp @@ -60,7 +60,7 @@ inline bool is_earlier(timespec a, timespec b) Server::Server() { - epoll_fd = epoll_create(1024); // Size argument is ignored. + epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (epoll_fd == -1) { log_perror("epoll_fd"); exit(1); @@ -70,6 +70,11 @@ Server::Server() Server::~Server() { safe_close(epoll_fd); + + // We're going to die soon anyway, but clean this up to keep leak checking happy. + for (const auto &acceptor_and_context : tls_server_contexts) { + tls_destroy_context(acceptor_and_context.second); + } } vector Server::get_client_stats() const @@ -83,6 +88,28 @@ vector Server::get_client_stats() const return ret; } +vector Server::get_hls_zombies() +{ + vector ret; + + timespec now; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { + log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); + return ret; + } + + lock_guard lock(mu); + for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) { + if (is_earlier(it->second.expires, now)) { + hls_zombies.erase(it++); + } else { + ret.push_back(it->second); + ++it; + } + } + return ret; +} + void Server::do_work() { while (!should_stop()) { @@ -194,6 +221,18 @@ CubemapStateProto Server::serialize(unordered_map *short for (unique_ptr &stream : streams) { serialized.add_streams()->MergeFrom(stream->serialize()); } + for (const auto &key_and_zombie : hls_zombies) { + HLSZombieProto *proto = serialized.add_hls_zombies(); + proto->set_key(key_and_zombie.first); + + const HLSZombie &zombie = key_and_zombie.second; + proto->set_remote_addr(zombie.remote_addr); + proto->set_url(zombie.url); + proto->set_referer(zombie.referer); + proto->set_user_agent(zombie.user_agent); + proto->set_expires_sec(zombie.expires.tv_sec); + proto->set_expires_nsec(zombie.expires.tv_nsec); + } return serialized; } @@ -348,7 +387,20 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) streams.emplace_back(new Stream(stream, data_fd)); return streams.size() - 1; } - + +void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto) +{ + lock_guard lock(mu); + HLSZombie zombie; + zombie.remote_addr = zombie_proto.remote_addr(); + zombie.url = zombie_proto.url(); + zombie.referer = zombie_proto.referer(); + zombie.user_agent = zombie_proto.user_agent(); + zombie.expires.tv_sec = zombie_proto.expires_sec(); + zombie.expires.tv_nsec = zombie_proto.expires_nsec(); + hls_zombies[zombie_proto.key()] = move(zombie); +} + void Server::set_backlog_size(int stream_index, size_t new_size) { lock_guard lock(mu); @@ -388,7 +440,6 @@ void Server::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin) { lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); - assert(hls_backlog_margin >= 0); assert(hls_backlog_margin < streams[stream_index]->backlog_size); streams[stream_index]->hls_backlog_margin = hls_backlog_margin; } @@ -412,26 +463,14 @@ void Server::set_header(int stream_index, const string &http_header, const strin { lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); - Stream *stream = streams[stream_index].get(); - stream->http_header = http_header; - - if (stream_header != stream->stream_header) { - // We cannot start at any of the older starting points anymore, - // since they'd get the wrong header for the stream (not to mention - // that a changed header probably means the stream restarted, - // which means any client starting on the old one would probably - // stop playing properly at the change point). Next block - // should be a suitable starting point (if not, something is - // pretty strange), so it will fill up again soon enough. - stream->suitable_starting_points.clear(); - - if (!stream->fragments.empty()) { - stream->fragments.clear(); - ++stream->discontinuity_counter; - stream->clear_hls_playlist_cache(); - } - } - stream->stream_header = stream_header; + streams[stream_index]->set_header(http_header, stream_header); +} + +void Server::set_unavailable(int stream_index) +{ + lock_guard lock(mu); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->set_unavailable(); } void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) @@ -478,7 +517,7 @@ void Server::process_client(Client *client) { switch (client->state) { case Client::READING_REQUEST: { - if (client->tls_context != nullptr) { + if (client->tls_context != nullptr && !client->in_ktls_mode) { if (send_pending_tls_data(client)) { // send_pending_tls_data() hit postconditions #1 or #4. return; @@ -489,10 +528,10 @@ read_request_again: // Try to read more of the request. char buf[1024]; int ret; - if (client->tls_context == nullptr) { - ret = read_nontls_data(client, buf, sizeof(buf)); + if (client->tls_context == nullptr || client->in_ktls_mode) { + ret = read_plain_data(client, buf, sizeof(buf)); if (ret == -1) { - // read_nontls_data() hit postconditions #1 or #2. + // read_plain_data() hit postconditions #1 or #2. return; } } else { @@ -524,22 +563,6 @@ read_request_again: assert(status == RP_FINISHED); - if (client->tls_context && !client->in_ktls_mode && tls_established(client->tls_context)) { - // We're ready to enter kTLS mode, unless we still have some - // handshake data to send (which then must be sent as non-kTLS). - if (send_pending_tls_data(client)) { - // send_pending_tls_data() hit postconditions #1 or #4. - return; - } - ret = tls_make_ktls(client->tls_context, client->sock); - if (ret < 0) { - log_tls_error("tls_make_ktls", ret); - close_client(client); - return; - } - client->in_ktls_mode = true; - } - int error_code = parse_request(client); if (error_code == 200) { if (client->serving_hls_playlist) { @@ -608,6 +631,7 @@ sending_header_or_short_response_again: } Stream *stream = client->stream; + hls_zombies.erase(client->get_hls_zombie_key()); if (client->stream_pos == Client::STREAM_POS_AT_START) { // Start sending from the beginning of the backlog. client->stream_pos = min( @@ -618,7 +642,7 @@ sending_header_or_short_response_again: } else if (client->stream_pos_end != Client::STREAM_POS_NO_END) { // We're sending a fragment, and should have all of it, // so start sending right away. - assert(client->stream_pos >= 0); + assert(ssize_t(client->stream_pos) >= 0); client->state = Client::SENDING_DATA; goto sending_data; } else if (stream->prebuffering_bytes == 0) { @@ -640,7 +664,7 @@ sending_header_or_short_response_again: // 100 kB prebuffer but end up sending a 10 MB GOP. assert(client->stream_pos == Client::STREAM_POS_AT_END); assert(client->stream_pos_end == Client::STREAM_POS_NO_END); - deque::const_iterator starting_point_it = + deque::const_iterator starting_point_it = lower_bound(stream->suitable_starting_points.begin(), stream->suitable_starting_points.end(), stream->bytes_received - stream->prebuffering_bytes); @@ -700,6 +724,20 @@ sending_data_again: assert(bytes_to_send <= stream->backlog_size); if (bytes_to_send == 0) { if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it. + // Add (or overwrite) a HLS zombie. + timespec now; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { + log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); + } else { + HLSZombie zombie; + zombie.remote_addr = client->remote_addr; + zombie.referer = client->referer; + zombie.user_agent = client->user_agent; + zombie.url = client->stream->url + "?frag="; + zombie.expires = now; + zombie.expires.tv_sec += client->stream->hls_frag_duration * 3; + hls_zombies[client->get_hls_zombie_key()] = move(zombie); + } if (more_requests(client)) { // We're done sending the fragment, but should keep on reading new requests. goto read_request_again; @@ -824,7 +862,7 @@ send_data_again: goto send_data_again; } -int Server::read_nontls_data(Client *client, char *buf, size_t max_size) +int Server::read_plain_data(Client *client, char *buf, size_t max_size) { int ret; do { @@ -853,6 +891,8 @@ int Server::read_nontls_data(Client *client, char *buf, size_t max_size) int Server::read_tls_data(Client *client, char *buf, size_t max_size) { read_again: + assert(!client->in_ktls_mode); + int ret; do { ret = read(client->sock, buf, max_size); @@ -903,6 +943,22 @@ read_again: return -1; } + if (tls_established(client->tls_context)) { + // We're ready to enter kTLS mode, unless we still have some + // handshake data to send (which then must be sent as non-kTLS). + if (send_pending_tls_data(client)) { + // send_pending_tls_data() hit postconditions #1 or #4. + return -1; + } + int err = tls_make_ktls(client->tls_context, client->sock); // Don't overwrite ret. + if (err < 0) { + log_tls_error("tls_make_ktls", ret); + close_client(client); + return -1; + } + client->in_ktls_mode = true; + } + assert(ret > 0); return ret; } @@ -952,6 +1008,12 @@ int Server::parse_request(Client *client) if (user_agent_it != headers.end()) { client->user_agent = user_agent_it->second; } + const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id"); + if (x_playback_session_id_it != headers.end()) { + client->x_playback_session_id = x_playback_session_id_it->second; + } else { + client->x_playback_session_id.clear(); + } vector request_tokens = split_tokens(lines[0]); if (request_tokens.size() < 3) { @@ -1047,9 +1109,6 @@ int Server::parse_request(Client *client) } Stream *stream = client->stream; - if (stream->http_header.empty()) { - return 503; // Service unavailable. - } if (client->serving_hls_playlist) { if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { @@ -1061,6 +1120,10 @@ int Server::parse_request(Client *client) } if (client->stream_pos_end == Client::STREAM_POS_NO_END) { + if (stream->unavailable) { + return 503; // Service unavailable. + } + // This stream won't end, so we don't have a content-length, // and can just as well tell the client it's Connection: close // (otherwise, we'd have to implement chunking TE for no good reason). @@ -1332,6 +1395,7 @@ bool Server::more_requests(Client *client) client->header_or_short_response_holder.clear(); client->header_or_short_response_ref.reset(); client->header_or_short_response_bytes_sent = 0; + client->bytes_sent = 0; start_client_timeout_timer(client); change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP); // No TLS handshake, so no EPOLLOUT needed.