remote_addr(serialized.remote_addr()),
referer(serialized.referer()),
user_agent(serialized.user_agent()),
+ x_playback_session_id(serialized.x_playback_session_id()),
state(State(serialized.state())),
request(serialized.request()),
url(serialized.url()),
serialized.set_remote_addr(remote_addr);
serialized.set_referer(referer);
serialized.set_user_agent(user_agent);
+ serialized.set_x_playback_session_id(x_playback_session_id);
serialized.set_connect_time_sec(connect_time.tv_sec);
serialized.set_connect_time_nsec(connect_time.tv_nsec);
serialized.set_state(state);
stats.bytes_sent = bytes_sent;
stats.bytes_lost = bytes_lost;
stats.num_loss_events = num_loss_events;
+ stats.hls_zombie_key = get_hls_zombie_key();
return stats;
}
size_t bytes_sent;
size_t bytes_lost;
size_t num_loss_events;
+ std::string hls_zombie_key;
};
struct Client {
ClientStats get_stats() const;
+ std::string get_hls_zombie_key() const {
+ if (x_playback_session_id.empty()) {
+ return remote_addr;
+ } else {
+ return x_playback_session_id;
+ }
+ }
+
// The file descriptor associated with this socket.
int sock;
std::string remote_addr;
std::string referer;
std::string user_agent;
+ std::string x_playback_session_id;
enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_SHORT_RESPONSE, WAITING_FOR_KEYFRAME, PREBUFFERING };
State state = READING_REQUEST;
short_response_pool.clear(); // No longer needed; the clients have their own refcounts now.
+ // Put back the HLS zombies. There's no really good allocation here
+ // except round-robin; it would be marginally more efficient to match it
+ // to the client (since that would have them deleted immediately when
+ // the client requests the next fragment, instead of being later weeded
+ // out during statistics collection), but it's not a big deal.
+ for (const HLSZombieProto &zombie_proto : loaded_state.hls_zombies()) {
+ servers->add_hls_zombie_from_serialized(zombie_proto);
+ }
+
servers->run();
// Now delete all inputs that are longer in use, and start the others.
return ret;
}
+vector<HLSZombie> Server::get_hls_zombies()
+{
+ vector<HLSZombie> ret;
+
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ return ret;
+ }
+
+ lock_guard<mutex> lock(mu);
+ for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+ if (is_earlier(it->second.expires, now)) {
+ hls_zombies.erase(it++);
+ } else {
+ ret.push_back(it->second);
+ ++it;
+ }
+ }
+ return ret;
+}
+
void Server::do_work()
{
while (!should_stop()) {
for (unique_ptr<Stream> &stream : streams) {
serialized.add_streams()->MergeFrom(stream->serialize());
}
+ for (const auto &key_and_zombie : hls_zombies) {
+ HLSZombieProto *proto = serialized.add_hls_zombies();
+ proto->set_key(key_and_zombie.first);
+
+ const HLSZombie &zombie = key_and_zombie.second;
+ proto->set_remote_addr(zombie.remote_addr);
+ proto->set_url(zombie.url);
+ proto->set_referer(zombie.referer);
+ proto->set_user_agent(zombie.user_agent);
+ proto->set_expires_sec(zombie.expires.tv_sec);
+ proto->set_expires_nsec(zombie.expires.tv_nsec);
+ }
return serialized;
}
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
-
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+ lock_guard<mutex> lock(mu);
+ HLSZombie zombie;
+ zombie.remote_addr = zombie_proto.remote_addr();
+ zombie.url = zombie_proto.url();
+ zombie.referer = zombie_proto.referer();
+ zombie.user_agent = zombie_proto.user_agent();
+ zombie.expires.tv_sec = zombie_proto.expires_sec();
+ zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+ hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
void Server::set_backlog_size(int stream_index, size_t new_size)
{
lock_guard<mutex> lock(mu);
}
Stream *stream = client->stream;
+ hls_zombies.erase(client->get_hls_zombie_key());
if (client->stream_pos == Client::STREAM_POS_AT_START) {
// Start sending from the beginning of the backlog.
client->stream_pos = min<size_t>(
assert(bytes_to_send <= stream->backlog_size);
if (bytes_to_send == 0) {
if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it.
+ // Add (or overwrite) a HLS zombie.
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ } else {
+ HLSZombie zombie;
+ zombie.remote_addr = client->remote_addr;
+ zombie.referer = client->referer;
+ zombie.user_agent = client->user_agent;
+ zombie.url = client->stream->url + "?frag=<idle>";
+ zombie.expires = now;
+ zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+ hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+ }
if (more_requests(client)) {
// We're done sending the fragment, but should keep on reading new requests.
goto read_request_again;
if (user_agent_it != headers.end()) {
client->user_agent = user_agent_it->second;
}
+ const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+ if (x_playback_session_id_it != headers.end()) {
+ client->x_playback_session_id = x_playback_session_id_it->second;
+ } else {
+ client->x_playback_session_id.clear();
+ }
vector<string> request_tokens = split_tokens(lines[0]);
if (request_tokens.size() < 3) {
class CubemapStateProto;
class StreamProto;
+class HLSZombieProto;
+
+// See Server::hls_zombies, below.
+struct HLSZombie {
+ std::string remote_addr;
+ std::string url;
+ std::string referer;
+ std::string user_agent;
+ timespec expires;
+};
class Server : public Thread {
public:
// Get the list of all currently connected clients.
std::vector<ClientStats> get_client_stats() const;
+ // See hls_zombies, below.
+ std::vector<HLSZombie> get_hls_zombies();
+
// Set header (both HTTP header and any stream headers) for the given stream.
void set_header(int stream_index,
const std::string &http_header,
size_t hls_backlog_margin,
const std::string &allow_origin);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
+ void add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie);
int lookup_stream_by_url(const std::string &url) const;
void set_backlog_size(int stream_index, size_t new_size);
void set_prebuffering_bytes(int stream_index, size_t new_amount);
// the timespec matches).
std::queue<std::pair<timespec, int>> clients_ordered_by_connect_time;
+ // HLS is harder to keep viewer statistics for than regular streams,
+ // since there's no 1:1 mapping between ongoing HTTP connections and
+ // actual viewers. After a HLS fragment has been successfully sent,
+ // we keep a note of that in this structure, so that we can add some
+ // fake entries in the .stats file for clients that we believe are still
+ // watching, but are not downloading anything right now. We clean this
+ // out whenever we write statistics centrally.
+ //
+ // The structure is keyed by X-Playback-Session-Id if it exists
+ // (typically iOS clients) or IP address otherwise; we can't use the socket,
+ // since clients can (and do) keep open multiple HTTP connections for
+ // the same video playack session, and may also close the socket
+ // between downloading fragments. This means multiple clients between
+ // the same NAT may be undercounted, but that's how it is.
+ std::unordered_map<std::string, HLSZombie> hls_zombies;
+
// Used for epoll implementation (obviously).
int epoll_fd;
epoll_event events[EPOLL_MAX_EVENTS];
for (const ClientProto &client : local_state.clients()) {
state.add_clients()->MergeFrom(client);
}
+ for (const HLSZombieProto &hls_zombie : local_state.hls_zombies()) {
+ state.add_hls_zombies()->MergeFrom(hls_zombie);
+ }
}
for (size_t i = 0; i < short_response_pool.size(); ++i) {
servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
}
+// It's fine to abuse clients_added here, since it's only ever used for round-robin purposes.
+void ServerPool::add_hls_zombie_from_serialized(const HLSZombieProto &hls_zombie)
+{
+ servers[clients_added++ % num_servers].add_hls_zombie_from_serialized(hls_zombie);
+}
+
int ServerPool::lookup_stream_by_url(const string &url) const
{
assert(servers != nullptr);
}
return ret;
}
-
+
+vector<HLSZombie> ServerPool::get_hls_zombies() const
+{
+ vector<HLSZombie> ret;
+ for (int i = 0; i < num_servers; ++i) {
+ vector<HLSZombie> stats = servers[i].get_hls_zombies();
+ ret.insert(ret.end(), stats.begin(), stats.end());
+ }
+ return ret;
+}
+
void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
{
for (int i = 0; i < num_servers; ++i) {
void add_client(int sock, Acceptor *acceptor);
void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
+ // Picks a srever (round-robin) and adds the given HLS zombie to it.
+ void add_hls_zombie_from_serialized(const HLSZombieProto &client);
+
// Adds the given stream to all the servers. Returns the stream index.
int add_stream(const std::string &url,
const std::string &hls_url,
void stop();
std::vector<ClientStats> get_client_stats() const;
+ std::vector<HLSZombie> get_hls_zombies() const;
private:
std::unique_ptr<Server[]> servers;
optional bytes tls_context = 17; // If not present, then not using TLS for this client.
optional int64 tls_output_bytes_already_consumed = 18;
optional bool in_ktls_mode = 19;
+ optional bytes x_playback_session_id = 22;
};
// Corresponds to struct Stream::FragmentStart.
optional bytes header_or_short_response = 1;
};
+message HLSZombieProto {
+ optional bytes key = 1;
+ optional bytes remote_addr = 2;
+ optional bytes url = 3;
+ optional bytes referer = 4;
+ optional bytes user_agent = 5;
+ optional int64 expires_sec = 6;
+ optional int64 expires_nsec = 7;
+};
+
message CubemapStateProto {
optional int64 serialize_start_sec = 6;
optional int64 serialize_start_usec = 7;
repeated InputProto inputs = 5;
repeated AcceptorProto acceptors = 8;
repeated ShortResponsePool short_response_pool = 9;
+ repeated HLSZombieProto hls_zombies = 10;
};
FILE *fp;
timespec now;
vector<ClientStats> client_stats;
+ vector<HLSZombie> hls_zombies;
+ unordered_map<string, HLSZombie> remaining_hls_zombies;
if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
goto sleep;
}
- client_stats = servers->get_client_stats();
- for (size_t i = 0; i < client_stats.size(); ++i) {
+ // Get all the HLS zombies and combine them into one map (we resolve conflicts
+ // by having an arbitrary element win; in practice, that means the lowest
+ // server ID).
+ for (HLSZombie &zombie : servers->get_hls_zombies()) {
+ const string remote_addr = zombie.remote_addr;
+ remaining_hls_zombies[move(remote_addr)] = move(zombie);
+ }
+
+ // Remove all zombies whose ID match an already ongoing request.
+ // (Normally, this is cleared out already when it starts,
+ // but the request could happen on a different server from the zombie,
+ // or the zombie could be deserialized.)
+ for (const ClientStats &stats : servers->get_client_stats()) {
+ if (stats.url != "-") {
+ remaining_hls_zombies.erase(stats.hls_zombie_key);
+ }
+ }
+
+ for (const ClientStats &stats : servers->get_client_stats()) {
+ string url = stats.url;
+ if (url == "-") {
+ // No download going on currently; could it be waiting for more HLS fragments?
+ auto it = remaining_hls_zombies.find(stats.remote_addr);
+ if (it != remaining_hls_zombies.end()) {
+ url = it->second.url;
+ remaining_hls_zombies.erase(it);
+ }
+ }
+
+ fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+ stats.remote_addr.c_str(),
+ stats.sock,
+ 0, // Used to be fwmark.
+ url.c_str(),
+ int(now.tv_sec - stats.connect_time.tv_sec), // Rather coarse.
+ (long long unsigned)(stats.bytes_sent),
+ (long long unsigned)(stats.bytes_lost),
+ (long long unsigned)(stats.num_loss_events),
+ stats.referer.c_str(),
+ stats.user_agent.c_str());
+ }
+ for (const auto &url_and_zombie : remaining_hls_zombies) {
+ const HLSZombie &zombie = url_and_zombie.second;
fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
- client_stats[i].remote_addr.c_str(),
- client_stats[i].sock,
+ zombie.remote_addr.c_str(),
+ 0, // Fake socket. (The Munin script doesn't like negative numbers.)
0, // Used to be fwmark.
- client_stats[i].url.c_str(),
- int(now.tv_sec - client_stats[i].connect_time.tv_sec), // Rather coarse.
- (long long unsigned)(client_stats[i].bytes_sent),
- (long long unsigned)(client_stats[i].bytes_lost),
- (long long unsigned)(client_stats[i].num_loss_events),
- client_stats[i].referer.c_str(),
- client_stats[i].user_agent.c_str());
+ zombie.url.c_str(),
+ 0,
+ 0ULL,
+ 0ULL,
+ 0ULL,
+ zombie.referer.c_str(),
+ zombie.user_agent.c_str());
}
if (fclose(fp) == EOF) {
log_perror("fclose");