Keep information about HLS downloads around for some time afterwards.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 22 Apr 2018 18:18:51 +0000 (20:18 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 22 Apr 2018 18:18:51 +0000 (20:18 +0200)
Gives _much_ more precise statistics for HLS clients, as they tend to
spend most of their time in idle (even with multiple connections).

client.cpp
client.h
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stats.cpp

index 23892ac..d7bec09 100644 (file)
@@ -54,6 +54,7 @@ Client::Client(const ClientProto &serialized, const vector<shared_ptr<const stri
          remote_addr(serialized.remote_addr()),
          referer(serialized.referer()),
          user_agent(serialized.user_agent()),
          remote_addr(serialized.remote_addr()),
          referer(serialized.referer()),
          user_agent(serialized.user_agent()),
+         x_playback_session_id(serialized.x_playback_session_id()),
          state(State(serialized.state())),
          request(serialized.request()),
          url(serialized.url()),
          state(State(serialized.state())),
          request(serialized.request()),
          url(serialized.url()),
@@ -117,6 +118,7 @@ ClientProto Client::serialize(unordered_map<const string *, size_t> *short_respo
        serialized.set_remote_addr(remote_addr);
        serialized.set_referer(referer);
        serialized.set_user_agent(user_agent);
        serialized.set_remote_addr(remote_addr);
        serialized.set_referer(referer);
        serialized.set_user_agent(user_agent);
+       serialized.set_x_playback_session_id(x_playback_session_id);
        serialized.set_connect_time_sec(connect_time.tv_sec);
        serialized.set_connect_time_nsec(connect_time.tv_nsec);
        serialized.set_state(state);
        serialized.set_connect_time_sec(connect_time.tv_sec);
        serialized.set_connect_time_nsec(connect_time.tv_nsec);
        serialized.set_state(state);
@@ -213,5 +215,6 @@ ClientStats Client::get_stats() const
        stats.bytes_sent = bytes_sent;
        stats.bytes_lost = bytes_lost;
        stats.num_loss_events = num_loss_events;
        stats.bytes_sent = bytes_sent;
        stats.bytes_lost = bytes_lost;
        stats.num_loss_events = num_loss_events;
+       stats.hls_zombie_key = get_hls_zombie_key();
        return stats;
 }
        return stats;
 }
index 858197f..e3fb26b 100644 (file)
--- a/client.h
+++ b/client.h
@@ -28,6 +28,7 @@ struct ClientStats {
        size_t bytes_sent;
        size_t bytes_lost;
        size_t num_loss_events;
        size_t bytes_sent;
        size_t bytes_lost;
        size_t num_loss_events;
+       std::string hls_zombie_key;
 };
 
 struct Client {
 };
 
 struct Client {
@@ -39,6 +40,14 @@ struct Client {
 
        ClientStats get_stats() const;
 
 
        ClientStats get_stats() const;
 
+       std::string get_hls_zombie_key() const {
+               if (x_playback_session_id.empty()) {
+                       return remote_addr;
+               } else {
+                       return x_playback_session_id;
+               }
+       }
+
        // The file descriptor associated with this socket.
        int sock;
 
        // The file descriptor associated with this socket.
        int sock;
 
@@ -49,6 +58,7 @@ struct Client {
        std::string remote_addr;
        std::string referer;
        std::string user_agent;
        std::string remote_addr;
        std::string referer;
        std::string user_agent;
+       std::string x_playback_session_id;
 
        enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_SHORT_RESPONSE, WAITING_FOR_KEYFRAME, PREBUFFERING };
        State state = READING_REQUEST;
 
        enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_SHORT_RESPONSE, WAITING_FOR_KEYFRAME, PREBUFFERING };
        State state = READING_REQUEST;
index 058935d..737f0c3 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -532,6 +532,15 @@ start:
        
        short_response_pool.clear();  // No longer needed; the clients have their own refcounts now.
 
        
        short_response_pool.clear();  // No longer needed; the clients have their own refcounts now.
 
+       // Put back the HLS zombies. There's no really good allocation here
+       // except round-robin; it would be marginally more efficient to match it
+       // to the client (since that would have them deleted immediately when
+       // the client requests the next fragment, instead of being later weeded
+       // out during statistics collection), but it's not a big deal.
+       for (const HLSZombieProto &zombie_proto : loaded_state.hls_zombies()) {
+               servers->add_hls_zombie_from_serialized(zombie_proto);
+       }
+
        servers->run();
 
        // Now delete all inputs that are longer in use, and start the others.
        servers->run();
 
        // Now delete all inputs that are longer in use, and start the others.
index 5f2c088..c12bc8d 100644 (file)
@@ -83,6 +83,28 @@ vector<ClientStats> Server::get_client_stats() const
        return ret;
 }
 
        return ret;
 }
 
+vector<HLSZombie> Server::get_hls_zombies()
+{
+       vector<HLSZombie> ret;
+
+       timespec now;
+       if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+               log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+               return ret;
+       }
+
+       lock_guard<mutex> lock(mu);
+       for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+               if (is_earlier(it->second.expires, now)) {
+                       hls_zombies.erase(it++);
+               } else {
+                       ret.push_back(it->second);
+                       ++it;
+               }
+       }
+       return ret;
+}
+
 void Server::do_work()
 {
        while (!should_stop()) {
 void Server::do_work()
 {
        while (!should_stop()) {
@@ -194,6 +216,18 @@ CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short
        for (unique_ptr<Stream> &stream : streams) {
                serialized.add_streams()->MergeFrom(stream->serialize());
        }
        for (unique_ptr<Stream> &stream : streams) {
                serialized.add_streams()->MergeFrom(stream->serialize());
        }
+       for (const auto &key_and_zombie : hls_zombies) {
+               HLSZombieProto *proto = serialized.add_hls_zombies();
+               proto->set_key(key_and_zombie.first);
+
+               const HLSZombie &zombie = key_and_zombie.second;
+               proto->set_remote_addr(zombie.remote_addr);
+               proto->set_url(zombie.url);
+               proto->set_referer(zombie.referer);
+               proto->set_user_agent(zombie.user_agent);
+               proto->set_expires_sec(zombie.expires.tv_sec);
+               proto->set_expires_nsec(zombie.expires.tv_nsec);
+       }
        return serialized;
 }
 
        return serialized;
 }
 
@@ -348,7 +382,20 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
        streams.emplace_back(new Stream(stream, data_fd));
        return streams.size() - 1;
 }
        streams.emplace_back(new Stream(stream, data_fd));
        return streams.size() - 1;
 }
-       
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+       lock_guard<mutex> lock(mu);
+       HLSZombie zombie;
+       zombie.remote_addr = zombie_proto.remote_addr();
+       zombie.url = zombie_proto.url();
+       zombie.referer = zombie_proto.referer();
+       zombie.user_agent = zombie_proto.user_agent();
+       zombie.expires.tv_sec = zombie_proto.expires_sec();
+       zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+       hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
 void Server::set_backlog_size(int stream_index, size_t new_size)
 {
        lock_guard<mutex> lock(mu);
 void Server::set_backlog_size(int stream_index, size_t new_size)
 {
        lock_guard<mutex> lock(mu);
@@ -608,6 +655,7 @@ sending_header_or_short_response_again:
                }
 
                Stream *stream = client->stream;
                }
 
                Stream *stream = client->stream;
+               hls_zombies.erase(client->get_hls_zombie_key());
                if (client->stream_pos == Client::STREAM_POS_AT_START) {
                        // Start sending from the beginning of the backlog.
                        client->stream_pos = min<size_t>(
                if (client->stream_pos == Client::STREAM_POS_AT_START) {
                        // Start sending from the beginning of the backlog.
                        client->stream_pos = min<size_t>(
@@ -700,6 +748,20 @@ sending_data_again:
                assert(bytes_to_send <= stream->backlog_size);
                if (bytes_to_send == 0) {
                        if (client->stream_pos == client->stream_pos_end) {  // We have a definite end, and we're at it.
                assert(bytes_to_send <= stream->backlog_size);
                if (bytes_to_send == 0) {
                        if (client->stream_pos == client->stream_pos_end) {  // We have a definite end, and we're at it.
+                               // Add (or overwrite) a HLS zombie.
+                               timespec now;
+                               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+                                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+                               } else {
+                                       HLSZombie zombie;
+                                       zombie.remote_addr = client->remote_addr;
+                                       zombie.referer = client->referer;
+                                       zombie.user_agent = client->user_agent;
+                                       zombie.url = client->stream->url + "?frag=<idle>";
+                                       zombie.expires = now;
+                                       zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+                                       hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+                               }
                                if (more_requests(client)) {
                                        // We're done sending the fragment, but should keep on reading new requests.
                                        goto read_request_again;
                                if (more_requests(client)) {
                                        // We're done sending the fragment, but should keep on reading new requests.
                                        goto read_request_again;
@@ -952,6 +1014,12 @@ int Server::parse_request(Client *client)
        if (user_agent_it != headers.end()) {
                client->user_agent = user_agent_it->second;
        }
        if (user_agent_it != headers.end()) {
                client->user_agent = user_agent_it->second;
        }
+       const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+       if (x_playback_session_id_it != headers.end()) {
+               client->x_playback_session_id = x_playback_session_id_it->second;
+       } else {
+               client->x_playback_session_id.clear();
+       }
 
        vector<string> request_tokens = split_tokens(lines[0]);
        if (request_tokens.size() < 3) {
 
        vector<string> request_tokens = split_tokens(lines[0]);
        if (request_tokens.size() < 3) {
index cfb6b2e..3727997 100644 (file)
--- a/server.h
+++ b/server.h
@@ -30,6 +30,16 @@ struct Stream;
 
 class CubemapStateProto;
 class StreamProto;
 
 class CubemapStateProto;
 class StreamProto;
+class HLSZombieProto;
+
+// See Server::hls_zombies, below.
+struct HLSZombie {
+       std::string remote_addr;
+       std::string url;
+       std::string referer;
+       std::string user_agent;
+       timespec expires;
+};
 
 class Server : public Thread {
 public:
 
 class Server : public Thread {
 public:
@@ -39,6 +49,9 @@ public:
        // Get the list of all currently connected clients.     
        std::vector<ClientStats> get_client_stats() const;
 
        // Get the list of all currently connected clients.     
        std::vector<ClientStats> get_client_stats() const;
 
+       // See hls_zombies, below.
+       std::vector<HLSZombie> get_hls_zombies();
+
        // Set header (both HTTP header and any stream headers) for the given stream.
        void set_header(int stream_index,
                        const std::string &http_header,
        // Set header (both HTTP header and any stream headers) for the given stream.
        void set_header(int stream_index,
                        const std::string &http_header,
@@ -69,6 +82,7 @@ public:
                       size_t hls_backlog_margin,
                       const std::string &allow_origin);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
                       size_t hls_backlog_margin,
                       const std::string &allow_origin);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
+       void add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie);
        int lookup_stream_by_url(const std::string &url) const;
        void set_backlog_size(int stream_index, size_t new_size);
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
        int lookup_stream_by_url(const std::string &url) const;
        void set_backlog_size(int stream_index, size_t new_size);
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
@@ -127,6 +141,22 @@ private:
        // the timespec matches).
        std::queue<std::pair<timespec, int>> clients_ordered_by_connect_time;
 
        // the timespec matches).
        std::queue<std::pair<timespec, int>> clients_ordered_by_connect_time;
 
+       // HLS is harder to keep viewer statistics for than regular streams,
+       // since there's no 1:1 mapping between ongoing HTTP connections and
+       // actual viewers. After a HLS fragment has been successfully sent,
+       // we keep a note of that in this structure, so that we can add some
+       // fake entries in the .stats file for clients that we believe are still
+       // watching, but are not downloading anything right now. We clean this
+       // out whenever we write statistics centrally.
+       //
+       // The structure is keyed by X-Playback-Session-Id if it exists
+       // (typically iOS clients) or IP address otherwise; we can't use the socket,
+       // since clients can (and do) keep open multiple HTTP connections for
+       // the same video playack session, and may also close the socket
+       // between downloading fragments. This means multiple clients between
+       // the same NAT may be undercounted, but that's how it is.
+       std::unordered_map<std::string, HLSZombie> hls_zombies;
+
        // Used for epoll implementation (obviously).
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];
        // Used for epoll implementation (obviously).
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];
index 9f4f3a4..c3fc69d 100644 (file)
@@ -43,6 +43,9 @@ CubemapStateProto ServerPool::serialize()
                for (const ClientProto &client : local_state.clients()) {
                        state.add_clients()->MergeFrom(client);
                }
                for (const ClientProto &client : local_state.clients()) {
                        state.add_clients()->MergeFrom(client);
                }
+               for (const HLSZombieProto &hls_zombie : local_state.hls_zombies()) {
+                       state.add_hls_zombies()->MergeFrom(hls_zombie);
+               }
         }
 
        for (size_t i = 0; i < short_response_pool.size(); ++i) {
         }
 
        for (size_t i = 0; i < short_response_pool.size(); ++i) {
@@ -65,6 +68,12 @@ void ServerPool::add_client_from_serialized(const ClientProto &client, const std
        servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
 }
 
        servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
 }
 
+// It's fine to abuse clients_added here, since it's only ever used for round-robin purposes.
+void ServerPool::add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie)
+{
+       servers[clients_added++ % num_servers].add_hls_zombie_from_serialized(hls_zombie);
+}
+
 int ServerPool::lookup_stream_by_url(const string &url) const
 {
        assert(servers != nullptr);
 int ServerPool::lookup_stream_by_url(const string &url) const
 {
        assert(servers != nullptr);
@@ -206,7 +215,17 @@ vector<ClientStats> ServerPool::get_client_stats() const
        }
        return ret;
 }
        }
        return ret;
 }
-       
+
+vector<HLSZombie> ServerPool::get_hls_zombies() const
+{
+       vector<HLSZombie> ret;
+       for (int i = 0; i < num_servers; ++i) {
+               vector<HLSZombie> stats = servers[i].get_hls_zombies();
+               ret.insert(ret.end(), stats.begin(), stats.end());
+       }
+       return ret;
+}
+
 void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
 {
        for (int i = 0; i < num_servers; ++i) {
 void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
 {
        for (int i = 0; i < num_servers; ++i) {
index a70d542..9cac26e 100644 (file)
@@ -29,6 +29,9 @@ public:
        void add_client(int sock, Acceptor *acceptor);
        void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
 
        void add_client(int sock, Acceptor *acceptor);
        void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
 
+       // Picks a srever (round-robin) and adds the given HLS zombie to it.
+       void add_hls_zombie_from_serialized(const HLSZombieProto &client);
+
        // Adds the given stream to all the servers. Returns the stream index.
        int add_stream(const std::string &url,
                       const std::string &hls_url,
        // Adds the given stream to all the servers. Returns the stream index.
        int add_stream(const std::string &url,
                       const std::string &hls_url,
@@ -95,6 +98,7 @@ public:
        void stop();
 
        std::vector<ClientStats> get_client_stats() const;
        void stop();
 
        std::vector<ClientStats> get_client_stats() const;
+       std::vector<HLSZombie> get_hls_zombies() const;
 
 private:
        std::unique_ptr<Server[]> servers;
 
 private:
        std::unique_ptr<Server[]> servers;
index edb6f77..90d86e4 100644 (file)
@@ -22,6 +22,7 @@ message ClientProto {
        optional bytes tls_context = 17;  // If not present, then not using TLS for this client.
        optional int64 tls_output_bytes_already_consumed = 18;
        optional bool in_ktls_mode = 19;
        optional bytes tls_context = 17;  // If not present, then not using TLS for this client.
        optional int64 tls_output_bytes_already_consumed = 18;
        optional bool in_ktls_mode = 19;
+       optional bytes x_playback_session_id = 22;
 };
 
 // Corresponds to struct Stream::FragmentStart.
 };
 
 // Corresponds to struct Stream::FragmentStart.
@@ -79,6 +80,16 @@ message ShortResponsePool {
        optional bytes header_or_short_response = 1;
 };
 
        optional bytes header_or_short_response = 1;
 };
 
+message HLSZombieProto {
+        optional bytes key = 1;
+        optional bytes remote_addr = 2;
+        optional bytes url = 3;
+        optional bytes referer = 4;
+        optional bytes user_agent = 5;
+        optional int64 expires_sec = 6;
+        optional int64 expires_nsec = 7;
+};
+
 message CubemapStateProto {
        optional int64 serialize_start_sec = 6;
        optional int64 serialize_start_usec = 7;
 message CubemapStateProto {
        optional int64 serialize_start_sec = 6;
        optional int64 serialize_start_usec = 7;
@@ -87,4 +98,5 @@ message CubemapStateProto {
        repeated InputProto inputs = 5;
        repeated AcceptorProto acceptors = 8;
        repeated ShortResponsePool short_response_pool = 9;
        repeated InputProto inputs = 5;
        repeated AcceptorProto acceptors = 8;
        repeated ShortResponsePool short_response_pool = 9;
+       repeated HLSZombieProto hls_zombies = 10;
 };
 };
index 5193243..955f2e9 100644 (file)
--- a/stats.cpp
+++ b/stats.cpp
@@ -31,6 +31,8 @@ void StatsThread::do_work()
                FILE *fp;
                timespec now;
                vector<ClientStats> client_stats;
                FILE *fp;
                timespec now;
                vector<ClientStats> client_stats;
+               vector<HLSZombie> hls_zombies;
+               unordered_map<string, HLSZombie> remaining_hls_zombies;
 
                if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
                        log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
 
                if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
                        log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
@@ -57,19 +59,60 @@ void StatsThread::do_work()
                        goto sleep;
                }
 
                        goto sleep;
                }
 
-               client_stats = servers->get_client_stats();
-               for (size_t i = 0; i < client_stats.size(); ++i) {
+               // Get all the HLS zombies and combine them into one map (we resolve conflicts
+               // by having an arbitrary element win; in practice, that means the lowest
+               // server ID).
+               for (HLSZombie &zombie : servers->get_hls_zombies()) {
+                       const string remote_addr = zombie.remote_addr;
+                       remaining_hls_zombies[move(remote_addr)] = move(zombie);
+               }
+
+               // Remove all zombies whose ID match an already ongoing request.
+               // (Normally, this is cleared out already when it starts,
+               // but the request could happen on a different server from the zombie,
+               // or the zombie could be deserialized.)
+               for (const ClientStats &stats : servers->get_client_stats()) {
+                       if (stats.url != "-") {
+                               remaining_hls_zombies.erase(stats.hls_zombie_key);
+                       }
+               }
+
+               for (const ClientStats &stats : servers->get_client_stats()) {
+                       string url = stats.url;
+                       if (url == "-") {
+                               // No download going on currently; could it be waiting for more HLS fragments?
+                               auto it = remaining_hls_zombies.find(stats.remote_addr);
+                               if (it != remaining_hls_zombies.end()) {
+                                       url = it->second.url;
+                                       remaining_hls_zombies.erase(it);
+                               }
+                       }
+
+                       fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+                               stats.remote_addr.c_str(),
+                               stats.sock,
+                               0,  // Used to be fwmark.
+                               url.c_str(),
+                               int(now.tv_sec - stats.connect_time.tv_sec),  // Rather coarse.
+                               (long long unsigned)(stats.bytes_sent),
+                               (long long unsigned)(stats.bytes_lost),
+                               (long long unsigned)(stats.num_loss_events),
+                               stats.referer.c_str(),
+                               stats.user_agent.c_str());
+               }
+               for (const auto &url_and_zombie : remaining_hls_zombies) {
+                       const HLSZombie &zombie = url_and_zombie.second;
                        fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
                        fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
-                               client_stats[i].remote_addr.c_str(),
-                               client_stats[i].sock,
+                               zombie.remote_addr.c_str(),
+                               0,  // Fake socket. (The Munin script doesn't like negative numbers.)
                                0,  // Used to be fwmark.
                                0,  // Used to be fwmark.
-                               client_stats[i].url.c_str(),
-                               int(now.tv_sec - client_stats[i].connect_time.tv_sec),  // Rather coarse.
-                               (long long unsigned)(client_stats[i].bytes_sent),
-                               (long long unsigned)(client_stats[i].bytes_lost),
-                               (long long unsigned)(client_stats[i].num_loss_events),
-                               client_stats[i].referer.c_str(),
-                               client_stats[i].user_agent.c_str());
+                               zombie.url.c_str(),
+                               0,
+                               0ULL,
+                               0ULL,
+                               0ULL,
+                               zombie.referer.c_str(),
+                               zombie.user_agent.c_str());
                }
                if (fclose(fp) == EOF) {
                        log_perror("fclose");
                }
                if (fclose(fp) == EOF) {
                        log_perror("fclose");