X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;fp=server.cpp;h=c12bc8dee370e7e51f692b2d3f5a52aab3cce0e8;hp=5f2c088c7826313534b73e0bad81218975978d52;hb=ed218ece51e5be77fd8bd8f014b5f05708e4be5b;hpb=49851eb86428de7a76b00ea4067cdf32c28e7397 diff --git a/server.cpp b/server.cpp index 5f2c088..c12bc8d 100644 --- a/server.cpp +++ b/server.cpp @@ -83,6 +83,28 @@ vector Server::get_client_stats() const return ret; } +vector Server::get_hls_zombies() +{ + vector ret; + + timespec now; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { + log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); + return ret; + } + + lock_guard lock(mu); + for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) { + if (is_earlier(it->second.expires, now)) { + hls_zombies.erase(it++); + } else { + ret.push_back(it->second); + ++it; + } + } + return ret; +} + void Server::do_work() { while (!should_stop()) { @@ -194,6 +216,18 @@ CubemapStateProto Server::serialize(unordered_map *short for (unique_ptr &stream : streams) { serialized.add_streams()->MergeFrom(stream->serialize()); } + for (const auto &key_and_zombie : hls_zombies) { + HLSZombieProto *proto = serialized.add_hls_zombies(); + proto->set_key(key_and_zombie.first); + + const HLSZombie &zombie = key_and_zombie.second; + proto->set_remote_addr(zombie.remote_addr); + proto->set_url(zombie.url); + proto->set_referer(zombie.referer); + proto->set_user_agent(zombie.user_agent); + proto->set_expires_sec(zombie.expires.tv_sec); + proto->set_expires_nsec(zombie.expires.tv_nsec); + } return serialized; } @@ -348,7 +382,20 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) streams.emplace_back(new Stream(stream, data_fd)); return streams.size() - 1; } - + +void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto) +{ + lock_guard lock(mu); + HLSZombie zombie; + zombie.remote_addr = zombie_proto.remote_addr(); + zombie.url = zombie_proto.url(); + zombie.referer = zombie_proto.referer(); + zombie.user_agent = zombie_proto.user_agent(); + zombie.expires.tv_sec = zombie_proto.expires_sec(); + zombie.expires.tv_nsec = zombie_proto.expires_nsec(); + hls_zombies[zombie_proto.key()] = move(zombie); +} + void Server::set_backlog_size(int stream_index, size_t new_size) { lock_guard lock(mu); @@ -608,6 +655,7 @@ sending_header_or_short_response_again: } Stream *stream = client->stream; + hls_zombies.erase(client->get_hls_zombie_key()); if (client->stream_pos == Client::STREAM_POS_AT_START) { // Start sending from the beginning of the backlog. client->stream_pos = min( @@ -700,6 +748,20 @@ sending_data_again: assert(bytes_to_send <= stream->backlog_size); if (bytes_to_send == 0) { if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it. + // Add (or overwrite) a HLS zombie. + timespec now; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { + log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); + } else { + HLSZombie zombie; + zombie.remote_addr = client->remote_addr; + zombie.referer = client->referer; + zombie.user_agent = client->user_agent; + zombie.url = client->stream->url + "?frag="; + zombie.expires = now; + zombie.expires.tv_sec += client->stream->hls_frag_duration * 3; + hls_zombies[client->get_hls_zombie_key()] = move(zombie); + } if (more_requests(client)) { // We're done sending the fragment, but should keep on reading new requests. goto read_request_again; @@ -952,6 +1014,12 @@ int Server::parse_request(Client *client) if (user_agent_it != headers.end()) { client->user_agent = user_agent_it->second; } + const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id"); + if (x_playback_session_id_it != headers.end()) { + client->x_playback_session_id = x_playback_session_id_it->second; + } else { + client->x_playback_session_id.clear(); + } vector request_tokens = split_tokens(lines[0]); if (request_tokens.size() < 3) {