]> git.sesse.net Git - cubemap/commitdiff
Keep information about HLS downloads around for some time afterwards.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 22 Apr 2018 18:18:51 +0000 (20:18 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 22 Apr 2018 18:18:51 +0000 (20:18 +0200)
Gives _much_ more precise statistics for HLS clients, as they tend to
spend most of their time in idle (even with multiple connections).

client.cpp
client.h
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stats.cpp

index 23892ac19038406e584241a0ee462583c9447c0b..d7bec0986983cb438322941f636a59bd45210003 100644 (file)
@@ -54,6 +54,7 @@ Client::Client(const ClientProto &serialized, const vector<shared_ptr<const stri
          remote_addr(serialized.remote_addr()),
          referer(serialized.referer()),
          user_agent(serialized.user_agent()),
+         x_playback_session_id(serialized.x_playback_session_id()),
          state(State(serialized.state())),
          request(serialized.request()),
          url(serialized.url()),
@@ -117,6 +118,7 @@ ClientProto Client::serialize(unordered_map<const string *, size_t> *short_respo
        serialized.set_remote_addr(remote_addr);
        serialized.set_referer(referer);
        serialized.set_user_agent(user_agent);
+       serialized.set_x_playback_session_id(x_playback_session_id);
        serialized.set_connect_time_sec(connect_time.tv_sec);
        serialized.set_connect_time_nsec(connect_time.tv_nsec);
        serialized.set_state(state);
@@ -213,5 +215,6 @@ ClientStats Client::get_stats() const
        stats.bytes_sent = bytes_sent;
        stats.bytes_lost = bytes_lost;
        stats.num_loss_events = num_loss_events;
+       stats.hls_zombie_key = get_hls_zombie_key();
        return stats;
 }
index 858197fdcc457dd19a73d5ffb66af50bff5cf5f2..e3fb26bd895698af8fcca68b36934c827eba7116 100644 (file)
--- a/client.h
+++ b/client.h
@@ -28,6 +28,7 @@ struct ClientStats {
        size_t bytes_sent;
        size_t bytes_lost;
        size_t num_loss_events;
+       std::string hls_zombie_key;
 };
 
 struct Client {
@@ -39,6 +40,14 @@ struct Client {
 
        ClientStats get_stats() const;
 
+       std::string get_hls_zombie_key() const {
+               if (x_playback_session_id.empty()) {
+                       return remote_addr;
+               } else {
+                       return x_playback_session_id;
+               }
+       }
+
        // The file descriptor associated with this socket.
        int sock;
 
@@ -49,6 +58,7 @@ struct Client {
        std::string remote_addr;
        std::string referer;
        std::string user_agent;
+       std::string x_playback_session_id;
 
        enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_SHORT_RESPONSE, WAITING_FOR_KEYFRAME, PREBUFFERING };
        State state = READING_REQUEST;
index 058935d523165a5c7ff62479147329f17ec38037..737f0c3573acea88fa7d9f0585aee69d95ec0838 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -532,6 +532,15 @@ start:
        
        short_response_pool.clear();  // No longer needed; the clients have their own refcounts now.
 
+       // Put back the HLS zombies. There's no really good allocation here
+       // except round-robin; it would be marginally more efficient to match it
+       // to the client (since that would have them deleted immediately when
+       // the client requests the next fragment, instead of being later weeded
+       // out during statistics collection), but it's not a big deal.
+       for (const HLSZombieProto &zombie_proto : loaded_state.hls_zombies()) {
+               servers->add_hls_zombie_from_serialized(zombie_proto);
+       }
+
        servers->run();
 
        // Now delete all inputs that are longer in use, and start the others.
index 5f2c088c7826313534b73e0bad81218975978d52..c12bc8dee370e7e51f692b2d3f5a52aab3cce0e8 100644 (file)
@@ -83,6 +83,28 @@ vector<ClientStats> Server::get_client_stats() const
        return ret;
 }
 
+vector<HLSZombie> Server::get_hls_zombies()
+{
+       vector<HLSZombie> ret;
+
+       timespec now;
+       if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+               log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+               return ret;
+       }
+
+       lock_guard<mutex> lock(mu);
+       for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+               if (is_earlier(it->second.expires, now)) {
+                       hls_zombies.erase(it++);
+               } else {
+                       ret.push_back(it->second);
+                       ++it;
+               }
+       }
+       return ret;
+}
+
 void Server::do_work()
 {
        while (!should_stop()) {
@@ -194,6 +216,18 @@ CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short
        for (unique_ptr<Stream> &stream : streams) {
                serialized.add_streams()->MergeFrom(stream->serialize());
        }
+       for (const auto &key_and_zombie : hls_zombies) {
+               HLSZombieProto *proto = serialized.add_hls_zombies();
+               proto->set_key(key_and_zombie.first);
+
+               const HLSZombie &zombie = key_and_zombie.second;
+               proto->set_remote_addr(zombie.remote_addr);
+               proto->set_url(zombie.url);
+               proto->set_referer(zombie.referer);
+               proto->set_user_agent(zombie.user_agent);
+               proto->set_expires_sec(zombie.expires.tv_sec);
+               proto->set_expires_nsec(zombie.expires.tv_nsec);
+       }
        return serialized;
 }
 
@@ -348,7 +382,20 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
        streams.emplace_back(new Stream(stream, data_fd));
        return streams.size() - 1;
 }
-       
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+       lock_guard<mutex> lock(mu);
+       HLSZombie zombie;
+       zombie.remote_addr = zombie_proto.remote_addr();
+       zombie.url = zombie_proto.url();
+       zombie.referer = zombie_proto.referer();
+       zombie.user_agent = zombie_proto.user_agent();
+       zombie.expires.tv_sec = zombie_proto.expires_sec();
+       zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+       hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
 void Server::set_backlog_size(int stream_index, size_t new_size)
 {
        lock_guard<mutex> lock(mu);
@@ -608,6 +655,7 @@ sending_header_or_short_response_again:
                }
 
                Stream *stream = client->stream;
+               hls_zombies.erase(client->get_hls_zombie_key());
                if (client->stream_pos == Client::STREAM_POS_AT_START) {
                        // Start sending from the beginning of the backlog.
                        client->stream_pos = min<size_t>(
@@ -700,6 +748,20 @@ sending_data_again:
                assert(bytes_to_send <= stream->backlog_size);
                if (bytes_to_send == 0) {
                        if (client->stream_pos == client->stream_pos_end) {  // We have a definite end, and we're at it.
+                               // Add (or overwrite) a HLS zombie.
+                               timespec now;
+                               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+                                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+                               } else {
+                                       HLSZombie zombie;
+                                       zombie.remote_addr = client->remote_addr;
+                                       zombie.referer = client->referer;
+                                       zombie.user_agent = client->user_agent;
+                                       zombie.url = client->stream->url + "?frag=<idle>";
+                                       zombie.expires = now;
+                                       zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+                                       hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+                               }
                                if (more_requests(client)) {
                                        // We're done sending the fragment, but should keep on reading new requests.
                                        goto read_request_again;
@@ -952,6 +1014,12 @@ int Server::parse_request(Client *client)
        if (user_agent_it != headers.end()) {
                client->user_agent = user_agent_it->second;
        }
+       const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+       if (x_playback_session_id_it != headers.end()) {
+               client->x_playback_session_id = x_playback_session_id_it->second;
+       } else {
+               client->x_playback_session_id.clear();
+       }
 
        vector<string> request_tokens = split_tokens(lines[0]);
        if (request_tokens.size() < 3) {
index cfb6b2e13680409314ba444f68dc023666d8de31..372799778f8411f42a95506e49ad0443175dcb8f 100644 (file)
--- a/server.h
+++ b/server.h
@@ -30,6 +30,16 @@ struct Stream;
 
 class CubemapStateProto;
 class StreamProto;
+class HLSZombieProto;
+
+// See Server::hls_zombies, below.
+struct HLSZombie {
+       std::string remote_addr;
+       std::string url;
+       std::string referer;
+       std::string user_agent;
+       timespec expires;
+};
 
 class Server : public Thread {
 public:
@@ -39,6 +49,9 @@ public:
        // Get the list of all currently connected clients.     
        std::vector<ClientStats> get_client_stats() const;
 
+       // See hls_zombies, below.
+       std::vector<HLSZombie> get_hls_zombies();
+
        // Set header (both HTTP header and any stream headers) for the given stream.
        void set_header(int stream_index,
                        const std::string &http_header,
@@ -69,6 +82,7 @@ public:
                       size_t hls_backlog_margin,
                       const std::string &allow_origin);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
+       void add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie);
        int lookup_stream_by_url(const std::string &url) const;
        void set_backlog_size(int stream_index, size_t new_size);
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
@@ -127,6 +141,22 @@ private:
        // the timespec matches).
        std::queue<std::pair<timespec, int>> clients_ordered_by_connect_time;
 
+       // HLS is harder to keep viewer statistics for than regular streams,
+       // since there's no 1:1 mapping between ongoing HTTP connections and
+       // actual viewers. After a HLS fragment has been successfully sent,
+       // we keep a note of that in this structure, so that we can add some
+       // fake entries in the .stats file for clients that we believe are still
+       // watching, but are not downloading anything right now. We clean this
+       // out whenever we write statistics centrally.
+       //
+       // The structure is keyed by X-Playback-Session-Id if it exists
+       // (typically iOS clients) or IP address otherwise; we can't use the socket,
+       // since clients can (and do) keep open multiple HTTP connections for
+       // the same video playack session, and may also close the socket
+       // between downloading fragments. This means multiple clients between
+       // the same NAT may be undercounted, but that's how it is.
+       std::unordered_map<std::string, HLSZombie> hls_zombies;
+
        // Used for epoll implementation (obviously).
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];
index 9f4f3a42eb2e039d7ce7bd753653b4115f2097fa..c3fc69d04a3414fd00fee354966bc02a886c8d11 100644 (file)
@@ -43,6 +43,9 @@ CubemapStateProto ServerPool::serialize()
                for (const ClientProto &client : local_state.clients()) {
                        state.add_clients()->MergeFrom(client);
                }
+               for (const HLSZombieProto &hls_zombie : local_state.hls_zombies()) {
+                       state.add_hls_zombies()->MergeFrom(hls_zombie);
+               }
         }
 
        for (size_t i = 0; i < short_response_pool.size(); ++i) {
@@ -65,6 +68,12 @@ void ServerPool::add_client_from_serialized(const ClientProto &client, const std
        servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
 }
 
+// It's fine to abuse clients_added here, since it's only ever used for round-robin purposes.
+void ServerPool::add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie)
+{
+       servers[clients_added++ % num_servers].add_hls_zombie_from_serialized(hls_zombie);
+}
+
 int ServerPool::lookup_stream_by_url(const string &url) const
 {
        assert(servers != nullptr);
@@ -206,7 +215,17 @@ vector<ClientStats> ServerPool::get_client_stats() const
        }
        return ret;
 }
-       
+
+vector<HLSZombie> ServerPool::get_hls_zombies() const
+{
+       vector<HLSZombie> ret;
+       for (int i = 0; i < num_servers; ++i) {
+               vector<HLSZombie> stats = servers[i].get_hls_zombies();
+               ret.insert(ret.end(), stats.begin(), stats.end());
+       }
+       return ret;
+}
+
 void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
 {
        for (int i = 0; i < num_servers; ++i) {
index a70d5429221af06f82049e8c098d93e52c33cbc9..9cac26e47133f270e91c040afa3091420433f387 100644 (file)
@@ -29,6 +29,9 @@ public:
        void add_client(int sock, Acceptor *acceptor);
        void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
 
+       // Picks a srever (round-robin) and adds the given HLS zombie to it.
+       void add_hls_zombie_from_serialized(const HLSZombieProto &client);
+
        // Adds the given stream to all the servers. Returns the stream index.
        int add_stream(const std::string &url,
                       const std::string &hls_url,
@@ -95,6 +98,7 @@ public:
        void stop();
 
        std::vector<ClientStats> get_client_stats() const;
+       std::vector<HLSZombie> get_hls_zombies() const;
 
 private:
        std::unique_ptr<Server[]> servers;
index edb6f777f89fa68ca3a2d8f38c261a3faca18ad9..90d86e4170f5448f794c3be910193c74f7000e9d 100644 (file)
@@ -22,6 +22,7 @@ message ClientProto {
        optional bytes tls_context = 17;  // If not present, then not using TLS for this client.
        optional int64 tls_output_bytes_already_consumed = 18;
        optional bool in_ktls_mode = 19;
+       optional bytes x_playback_session_id = 22;
 };
 
 // Corresponds to struct Stream::FragmentStart.
@@ -79,6 +80,16 @@ message ShortResponsePool {
        optional bytes header_or_short_response = 1;
 };
 
+message HLSZombieProto {
+        optional bytes key = 1;
+        optional bytes remote_addr = 2;
+        optional bytes url = 3;
+        optional bytes referer = 4;
+        optional bytes user_agent = 5;
+        optional int64 expires_sec = 6;
+        optional int64 expires_nsec = 7;
+};
+
 message CubemapStateProto {
        optional int64 serialize_start_sec = 6;
        optional int64 serialize_start_usec = 7;
@@ -87,4 +98,5 @@ message CubemapStateProto {
        repeated InputProto inputs = 5;
        repeated AcceptorProto acceptors = 8;
        repeated ShortResponsePool short_response_pool = 9;
+       repeated HLSZombieProto hls_zombies = 10;
 };
index 5193243aafb82ab87bb6ca577a9c14ddfa5a1874..955f2e9450ebb6fc7baf73e188fc6eddf372fa62 100644 (file)
--- a/stats.cpp
+++ b/stats.cpp
@@ -31,6 +31,8 @@ void StatsThread::do_work()
                FILE *fp;
                timespec now;
                vector<ClientStats> client_stats;
+               vector<HLSZombie> hls_zombies;
+               unordered_map<string, HLSZombie> remaining_hls_zombies;
 
                if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
                        log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
@@ -57,19 +59,60 @@ void StatsThread::do_work()
                        goto sleep;
                }
 
-               client_stats = servers->get_client_stats();
-               for (size_t i = 0; i < client_stats.size(); ++i) {
+               // Get all the HLS zombies and combine them into one map (we resolve conflicts
+               // by having an arbitrary element win; in practice, that means the lowest
+               // server ID).
+               for (HLSZombie &zombie : servers->get_hls_zombies()) {
+                       const string remote_addr = zombie.remote_addr;
+                       remaining_hls_zombies[move(remote_addr)] = move(zombie);
+               }
+
+               // Remove all zombies whose ID match an already ongoing request.
+               // (Normally, this is cleared out already when it starts,
+               // but the request could happen on a different server from the zombie,
+               // or the zombie could be deserialized.)
+               for (const ClientStats &stats : servers->get_client_stats()) {
+                       if (stats.url != "-") {
+                               remaining_hls_zombies.erase(stats.hls_zombie_key);
+                       }
+               }
+
+               for (const ClientStats &stats : servers->get_client_stats()) {
+                       string url = stats.url;
+                       if (url == "-") {
+                               // No download going on currently; could it be waiting for more HLS fragments?
+                               auto it = remaining_hls_zombies.find(stats.remote_addr);
+                               if (it != remaining_hls_zombies.end()) {
+                                       url = it->second.url;
+                                       remaining_hls_zombies.erase(it);
+                               }
+                       }
+
+                       fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+                               stats.remote_addr.c_str(),
+                               stats.sock,
+                               0,  // Used to be fwmark.
+                               url.c_str(),
+                               int(now.tv_sec - stats.connect_time.tv_sec),  // Rather coarse.
+                               (long long unsigned)(stats.bytes_sent),
+                               (long long unsigned)(stats.bytes_lost),
+                               (long long unsigned)(stats.num_loss_events),
+                               stats.referer.c_str(),
+                               stats.user_agent.c_str());
+               }
+               for (const auto &url_and_zombie : remaining_hls_zombies) {
+                       const HLSZombie &zombie = url_and_zombie.second;
                        fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
-                               client_stats[i].remote_addr.c_str(),
-                               client_stats[i].sock,
+                               zombie.remote_addr.c_str(),
+                               0,  // Fake socket. (The Munin script doesn't like negative numbers.)
                                0,  // Used to be fwmark.
-                               client_stats[i].url.c_str(),
-                               int(now.tv_sec - client_stats[i].connect_time.tv_sec),  // Rather coarse.
-                               (long long unsigned)(client_stats[i].bytes_sent),
-                               (long long unsigned)(client_stats[i].bytes_lost),
-                               (long long unsigned)(client_stats[i].num_loss_events),
-                               client_stats[i].referer.c_str(),
-                               client_stats[i].user_agent.c_str());
+                               zombie.url.c_str(),
+                               0,
+                               0ULL,
+                               0ULL,
+                               0ULL,
+                               zombie.referer.c_str(),
+                               zombie.user_agent.c_str());
                }
                if (fclose(fp) == EOF) {
                        log_perror("fclose");