]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Keep information about HLS downloads around for some time afterwards.
[cubemap] / server.cpp
index ccac946726bf9710946ebb3a5612386ab0690e62..c12bc8dee370e7e51f692b2d3f5a52aab3cce0e8 100644 (file)
@@ -83,6 +83,28 @@ vector<ClientStats> Server::get_client_stats() const
        return ret;
 }
 
+vector<HLSZombie> Server::get_hls_zombies()
+{
+       vector<HLSZombie> ret;
+
+       timespec now;
+       if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+               log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+               return ret;
+       }
+
+       lock_guard<mutex> lock(mu);
+       for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+               if (is_earlier(it->second.expires, now)) {
+                       hls_zombies.erase(it++);
+               } else {
+                       ret.push_back(it->second);
+                       ++it;
+               }
+       }
+       return ret;
+}
+
 void Server::do_work()
 {
        while (!should_stop()) {
@@ -194,6 +216,18 @@ CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short
        for (unique_ptr<Stream> &stream : streams) {
                serialized.add_streams()->MergeFrom(stream->serialize());
        }
+       for (const auto &key_and_zombie : hls_zombies) {
+               HLSZombieProto *proto = serialized.add_hls_zombies();
+               proto->set_key(key_and_zombie.first);
+
+               const HLSZombie &zombie = key_and_zombie.second;
+               proto->set_remote_addr(zombie.remote_addr);
+               proto->set_url(zombie.url);
+               proto->set_referer(zombie.referer);
+               proto->set_user_agent(zombie.user_agent);
+               proto->set_expires_sec(zombie.expires.tv_sec);
+               proto->set_expires_nsec(zombie.expires.tv_nsec);
+       }
        return serialized;
 }
 
@@ -210,16 +244,7 @@ void Server::add_client(int sock, Acceptor *acceptor)
        assert(inserted.second == true);  // Should not already exist.
        Client *client_ptr = &inserted.first->second;
 
-       // Connection timestamps must be nondecreasing. I can't find any guarantee
-       // that even the monotonic clock can't go backwards by a small amount
-       // (think switching between CPUs with non-synchronized TSCs), so if
-       // this actually should happen, we hack around it by fudging
-       // connect_time.
-       if (!clients_ordered_by_connect_time.empty() &&
-           is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first)) {
-               client_ptr->connect_time = clients_ordered_by_connect_time.back().first;
-       }
-       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, sock));
+       start_client_timeout_timer(client_ptr);
 
        // Start listening on data from this socket.
        epoll_event ev;
@@ -302,6 +327,24 @@ void Server::add_client_from_serialized(const ClientProto &client, const vector<
        }
 }
 
+void Server::start_client_timeout_timer(Client *client)
+{
+       // Connection timestamps must be nondecreasing. I can't find any guarantee
+       // that even the monotonic clock can't go backwards by a small amount
+       // (think switching between CPUs with non-synchronized TSCs), so if
+       // this actually should happen, we hack around it by fudging
+       // connect_time.
+       if (clock_gettime(CLOCK_MONOTONIC_COARSE, &client->connect_time) == -1) {
+               log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+       } else {
+               if (!clients_ordered_by_connect_time.empty() &&
+                   is_earlier(client->connect_time, clients_ordered_by_connect_time.back().first)) {
+                       client->connect_time = clients_ordered_by_connect_time.back().first;
+               }
+               clients_ordered_by_connect_time.push(make_pair(client->connect_time, client->sock));
+       }
+}
+
 int Server::lookup_stream_by_url(const string &url) const
 {
        const auto stream_url_it = stream_url_map.find(url);
@@ -339,7 +382,20 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
        streams.emplace_back(new Stream(stream, data_fd));
        return streams.size() - 1;
 }
-       
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+       lock_guard<mutex> lock(mu);
+       HLSZombie zombie;
+       zombie.remote_addr = zombie_proto.remote_addr();
+       zombie.url = zombie_proto.url();
+       zombie.referer = zombie_proto.referer();
+       zombie.user_agent = zombie_proto.user_agent();
+       zombie.expires.tv_sec = zombie_proto.expires_sec();
+       zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+       hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
 void Server::set_backlog_size(int stream_index, size_t new_size)
 {
        lock_guard<mutex> lock(mu);
@@ -599,6 +655,7 @@ sending_header_or_short_response_again:
                }
 
                Stream *stream = client->stream;
+               hls_zombies.erase(client->get_hls_zombie_key());
                if (client->stream_pos == Client::STREAM_POS_AT_START) {
                        // Start sending from the beginning of the backlog.
                        client->stream_pos = min<size_t>(
@@ -691,6 +748,20 @@ sending_data_again:
                assert(bytes_to_send <= stream->backlog_size);
                if (bytes_to_send == 0) {
                        if (client->stream_pos == client->stream_pos_end) {  // We have a definite end, and we're at it.
+                               // Add (or overwrite) a HLS zombie.
+                               timespec now;
+                               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+                                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+                               } else {
+                                       HLSZombie zombie;
+                                       zombie.remote_addr = client->remote_addr;
+                                       zombie.referer = client->referer;
+                                       zombie.user_agent = client->user_agent;
+                                       zombie.url = client->stream->url + "?frag=<idle>";
+                                       zombie.expires = now;
+                                       zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+                                       hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+                               }
                                if (more_requests(client)) {
                                        // We're done sending the fragment, but should keep on reading new requests.
                                        goto read_request_again;
@@ -943,6 +1014,12 @@ int Server::parse_request(Client *client)
        if (user_agent_it != headers.end()) {
                client->user_agent = user_agent_it->second;
        }
+       const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+       if (x_playback_session_id_it != headers.end()) {
+               client->x_playback_session_id = x_playback_session_id_it->second;
+       } else {
+               client->x_playback_session_id.clear();
+       }
 
        vector<string> request_tokens = split_tokens(lines[0]);
        if (request_tokens.size() < 3) {
@@ -1323,6 +1400,8 @@ bool Server::more_requests(Client *client)
        client->header_or_short_response_holder.clear();
        client->header_or_short_response_ref.reset();
        client->header_or_short_response_bytes_sent = 0;
+       client->bytes_sent = 0;
+       start_client_timeout_timer(client);
 
        change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP);  // No TLS handshake, so no EPOLLOUT needed.