From: Steinar H. Gunderson Date: Sun, 22 Apr 2018 18:18:51 +0000 (+0200) Subject: Keep information about HLS downloads around for some time afterwards. X-Git-Tag: 1.4.0~8 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=ed218ece51e5be77fd8bd8f014b5f05708e4be5b;p=cubemap Keep information about HLS downloads around for some time afterwards. Gives _much_ more precise statistics for HLS clients, as they tend to spend most of their time in idle (even with multiple connections). --- diff --git a/client.cpp b/client.cpp index 23892ac..d7bec09 100644 --- a/client.cpp +++ b/client.cpp @@ -54,6 +54,7 @@ Client::Client(const ClientProto &serialized, const vector *short_respo serialized.set_remote_addr(remote_addr); serialized.set_referer(referer); serialized.set_user_agent(user_agent); + serialized.set_x_playback_session_id(x_playback_session_id); serialized.set_connect_time_sec(connect_time.tv_sec); serialized.set_connect_time_nsec(connect_time.tv_nsec); serialized.set_state(state); @@ -213,5 +215,6 @@ ClientStats Client::get_stats() const stats.bytes_sent = bytes_sent; stats.bytes_lost = bytes_lost; stats.num_loss_events = num_loss_events; + stats.hls_zombie_key = get_hls_zombie_key(); return stats; } diff --git a/client.h b/client.h index 858197f..e3fb26b 100644 --- a/client.h +++ b/client.h @@ -28,6 +28,7 @@ struct ClientStats { size_t bytes_sent; size_t bytes_lost; size_t num_loss_events; + std::string hls_zombie_key; }; struct Client { @@ -39,6 +40,14 @@ struct Client { ClientStats get_stats() const; + std::string get_hls_zombie_key() const { + if (x_playback_session_id.empty()) { + return remote_addr; + } else { + return x_playback_session_id; + } + } + // The file descriptor associated with this socket. int sock; @@ -49,6 +58,7 @@ struct Client { std::string remote_addr; std::string referer; std::string user_agent; + std::string x_playback_session_id; enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_SHORT_RESPONSE, WAITING_FOR_KEYFRAME, PREBUFFERING }; State state = READING_REQUEST; diff --git a/main.cpp b/main.cpp index 058935d..737f0c3 100644 --- a/main.cpp +++ b/main.cpp @@ -532,6 +532,15 @@ start: short_response_pool.clear(); // No longer needed; the clients have their own refcounts now. + // Put back the HLS zombies. There's no really good allocation here + // except round-robin; it would be marginally more efficient to match it + // to the client (since that would have them deleted immediately when + // the client requests the next fragment, instead of being later weeded + // out during statistics collection), but it's not a big deal. + for (const HLSZombieProto &zombie_proto : loaded_state.hls_zombies()) { + servers->add_hls_zombie_from_serialized(zombie_proto); + } + servers->run(); // Now delete all inputs that are longer in use, and start the others. diff --git a/server.cpp b/server.cpp index 5f2c088..c12bc8d 100644 --- a/server.cpp +++ b/server.cpp @@ -83,6 +83,28 @@ vector Server::get_client_stats() const return ret; } +vector Server::get_hls_zombies() +{ + vector ret; + + timespec now; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { + log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); + return ret; + } + + lock_guard lock(mu); + for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) { + if (is_earlier(it->second.expires, now)) { + hls_zombies.erase(it++); + } else { + ret.push_back(it->second); + ++it; + } + } + return ret; +} + void Server::do_work() { while (!should_stop()) { @@ -194,6 +216,18 @@ CubemapStateProto Server::serialize(unordered_map *short for (unique_ptr &stream : streams) { serialized.add_streams()->MergeFrom(stream->serialize()); } + for (const auto &key_and_zombie : hls_zombies) { + HLSZombieProto *proto = serialized.add_hls_zombies(); + proto->set_key(key_and_zombie.first); + + const HLSZombie &zombie = key_and_zombie.second; + proto->set_remote_addr(zombie.remote_addr); + proto->set_url(zombie.url); + proto->set_referer(zombie.referer); + proto->set_user_agent(zombie.user_agent); + proto->set_expires_sec(zombie.expires.tv_sec); + proto->set_expires_nsec(zombie.expires.tv_nsec); + } return serialized; } @@ -348,7 +382,20 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) streams.emplace_back(new Stream(stream, data_fd)); return streams.size() - 1; } - + +void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto) +{ + lock_guard lock(mu); + HLSZombie zombie; + zombie.remote_addr = zombie_proto.remote_addr(); + zombie.url = zombie_proto.url(); + zombie.referer = zombie_proto.referer(); + zombie.user_agent = zombie_proto.user_agent(); + zombie.expires.tv_sec = zombie_proto.expires_sec(); + zombie.expires.tv_nsec = zombie_proto.expires_nsec(); + hls_zombies[zombie_proto.key()] = move(zombie); +} + void Server::set_backlog_size(int stream_index, size_t new_size) { lock_guard lock(mu); @@ -608,6 +655,7 @@ sending_header_or_short_response_again: } Stream *stream = client->stream; + hls_zombies.erase(client->get_hls_zombie_key()); if (client->stream_pos == Client::STREAM_POS_AT_START) { // Start sending from the beginning of the backlog. client->stream_pos = min( @@ -700,6 +748,20 @@ sending_data_again: assert(bytes_to_send <= stream->backlog_size); if (bytes_to_send == 0) { if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it. + // Add (or overwrite) a HLS zombie. + timespec now; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { + log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); + } else { + HLSZombie zombie; + zombie.remote_addr = client->remote_addr; + zombie.referer = client->referer; + zombie.user_agent = client->user_agent; + zombie.url = client->stream->url + "?frag="; + zombie.expires = now; + zombie.expires.tv_sec += client->stream->hls_frag_duration * 3; + hls_zombies[client->get_hls_zombie_key()] = move(zombie); + } if (more_requests(client)) { // We're done sending the fragment, but should keep on reading new requests. goto read_request_again; @@ -952,6 +1014,12 @@ int Server::parse_request(Client *client) if (user_agent_it != headers.end()) { client->user_agent = user_agent_it->second; } + const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id"); + if (x_playback_session_id_it != headers.end()) { + client->x_playback_session_id = x_playback_session_id_it->second; + } else { + client->x_playback_session_id.clear(); + } vector request_tokens = split_tokens(lines[0]); if (request_tokens.size() < 3) { diff --git a/server.h b/server.h index cfb6b2e..3727997 100644 --- a/server.h +++ b/server.h @@ -30,6 +30,16 @@ struct Stream; class CubemapStateProto; class StreamProto; +class HLSZombieProto; + +// See Server::hls_zombies, below. +struct HLSZombie { + std::string remote_addr; + std::string url; + std::string referer; + std::string user_agent; + timespec expires; +}; class Server : public Thread { public: @@ -39,6 +49,9 @@ public: // Get the list of all currently connected clients. std::vector get_client_stats() const; + // See hls_zombies, below. + std::vector get_hls_zombies(); + // Set header (both HTTP header and any stream headers) for the given stream. void set_header(int stream_index, const std::string &http_header, @@ -69,6 +82,7 @@ public: size_t hls_backlog_margin, const std::string &allow_origin); int add_stream_from_serialized(const StreamProto &stream, int data_fd); + void add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie); int lookup_stream_by_url(const std::string &url) const; void set_backlog_size(int stream_index, size_t new_size); void set_prebuffering_bytes(int stream_index, size_t new_amount); @@ -127,6 +141,22 @@ private: // the timespec matches). std::queue> clients_ordered_by_connect_time; + // HLS is harder to keep viewer statistics for than regular streams, + // since there's no 1:1 mapping between ongoing HTTP connections and + // actual viewers. After a HLS fragment has been successfully sent, + // we keep a note of that in this structure, so that we can add some + // fake entries in the .stats file for clients that we believe are still + // watching, but are not downloading anything right now. We clean this + // out whenever we write statistics centrally. + // + // The structure is keyed by X-Playback-Session-Id if it exists + // (typically iOS clients) or IP address otherwise; we can't use the socket, + // since clients can (and do) keep open multiple HTTP connections for + // the same video playack session, and may also close the socket + // between downloading fragments. This means multiple clients between + // the same NAT may be undercounted, but that's how it is. + std::unordered_map hls_zombies; + // Used for epoll implementation (obviously). int epoll_fd; epoll_event events[EPOLL_MAX_EVENTS]; diff --git a/serverpool.cpp b/serverpool.cpp index 9f4f3a4..c3fc69d 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -43,6 +43,9 @@ CubemapStateProto ServerPool::serialize() for (const ClientProto &client : local_state.clients()) { state.add_clients()->MergeFrom(client); } + for (const HLSZombieProto &hls_zombie : local_state.hls_zombies()) { + state.add_hls_zombies()->MergeFrom(hls_zombie); + } } for (size_t i = 0; i < short_response_pool.size(); ++i) { @@ -65,6 +68,12 @@ void ServerPool::add_client_from_serialized(const ClientProto &client, const std servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses); } +// It's fine to abuse clients_added here, since it's only ever used for round-robin purposes. +void ServerPool::add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie) +{ + servers[clients_added++ % num_servers].add_hls_zombie_from_serialized(hls_zombie); +} + int ServerPool::lookup_stream_by_url(const string &url) const { assert(servers != nullptr); @@ -206,7 +215,17 @@ vector ServerPool::get_client_stats() const } return ret; } - + +vector ServerPool::get_hls_zombies() const +{ + vector ret; + for (int i = 0; i < num_servers; ++i) { + vector stats = servers[i].get_hls_zombies(); + ret.insert(ret.end(), stats.begin(), stats.end()); + } + return ret; +} + void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate) { for (int i = 0; i < num_servers; ++i) { diff --git a/serverpool.h b/serverpool.h index a70d542..9cac26e 100644 --- a/serverpool.h +++ b/serverpool.h @@ -29,6 +29,9 @@ public: void add_client(int sock, Acceptor *acceptor); void add_client_from_serialized(const ClientProto &client, const std::vector> &short_responses); + // Picks a srever (round-robin) and adds the given HLS zombie to it. + void add_hls_zombie_from_serialized(const HLSZombieProto &client); + // Adds the given stream to all the servers. Returns the stream index. int add_stream(const std::string &url, const std::string &hls_url, @@ -95,6 +98,7 @@ public: void stop(); std::vector get_client_stats() const; + std::vector get_hls_zombies() const; private: std::unique_ptr servers; diff --git a/state.proto b/state.proto index edb6f77..90d86e4 100644 --- a/state.proto +++ b/state.proto @@ -22,6 +22,7 @@ message ClientProto { optional bytes tls_context = 17; // If not present, then not using TLS for this client. optional int64 tls_output_bytes_already_consumed = 18; optional bool in_ktls_mode = 19; + optional bytes x_playback_session_id = 22; }; // Corresponds to struct Stream::FragmentStart. @@ -79,6 +80,16 @@ message ShortResponsePool { optional bytes header_or_short_response = 1; }; +message HLSZombieProto { + optional bytes key = 1; + optional bytes remote_addr = 2; + optional bytes url = 3; + optional bytes referer = 4; + optional bytes user_agent = 5; + optional int64 expires_sec = 6; + optional int64 expires_nsec = 7; +}; + message CubemapStateProto { optional int64 serialize_start_sec = 6; optional int64 serialize_start_usec = 7; @@ -87,4 +98,5 @@ message CubemapStateProto { repeated InputProto inputs = 5; repeated AcceptorProto acceptors = 8; repeated ShortResponsePool short_response_pool = 9; + repeated HLSZombieProto hls_zombies = 10; }; diff --git a/stats.cpp b/stats.cpp index 5193243..955f2e9 100644 --- a/stats.cpp +++ b/stats.cpp @@ -31,6 +31,8 @@ void StatsThread::do_work() FILE *fp; timespec now; vector client_stats; + vector hls_zombies; + unordered_map remaining_hls_zombies; if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) { log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)"); @@ -57,19 +59,60 @@ void StatsThread::do_work() goto sleep; } - client_stats = servers->get_client_stats(); - for (size_t i = 0; i < client_stats.size(); ++i) { + // Get all the HLS zombies and combine them into one map (we resolve conflicts + // by having an arbitrary element win; in practice, that means the lowest + // server ID). + for (HLSZombie &zombie : servers->get_hls_zombies()) { + const string remote_addr = zombie.remote_addr; + remaining_hls_zombies[move(remote_addr)] = move(zombie); + } + + // Remove all zombies whose ID match an already ongoing request. + // (Normally, this is cleared out already when it starts, + // but the request could happen on a different server from the zombie, + // or the zombie could be deserialized.) + for (const ClientStats &stats : servers->get_client_stats()) { + if (stats.url != "-") { + remaining_hls_zombies.erase(stats.hls_zombie_key); + } + } + + for (const ClientStats &stats : servers->get_client_stats()) { + string url = stats.url; + if (url == "-") { + // No download going on currently; could it be waiting for more HLS fragments? + auto it = remaining_hls_zombies.find(stats.remote_addr); + if (it != remaining_hls_zombies.end()) { + url = it->second.url; + remaining_hls_zombies.erase(it); + } + } + + fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n", + stats.remote_addr.c_str(), + stats.sock, + 0, // Used to be fwmark. + url.c_str(), + int(now.tv_sec - stats.connect_time.tv_sec), // Rather coarse. + (long long unsigned)(stats.bytes_sent), + (long long unsigned)(stats.bytes_lost), + (long long unsigned)(stats.num_loss_events), + stats.referer.c_str(), + stats.user_agent.c_str()); + } + for (const auto &url_and_zombie : remaining_hls_zombies) { + const HLSZombie &zombie = url_and_zombie.second; fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n", - client_stats[i].remote_addr.c_str(), - client_stats[i].sock, + zombie.remote_addr.c_str(), + 0, // Fake socket. (The Munin script doesn't like negative numbers.) 0, // Used to be fwmark. - client_stats[i].url.c_str(), - int(now.tv_sec - client_stats[i].connect_time.tv_sec), // Rather coarse. - (long long unsigned)(client_stats[i].bytes_sent), - (long long unsigned)(client_stats[i].bytes_lost), - (long long unsigned)(client_stats[i].num_loss_events), - client_stats[i].referer.c_str(), - client_stats[i].user_agent.c_str()); + zombie.url.c_str(), + 0, + 0ULL, + 0ULL, + 0ULL, + zombie.referer.c_str(), + zombie.user_agent.c_str()); } if (fclose(fp) == EOF) { log_perror("fclose");