return ret;
}
+vector<HLSZombie> Server::get_hls_zombies()
+{
+ vector<HLSZombie> ret;
+
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ return ret;
+ }
+
+ lock_guard<mutex> lock(mu);
+ for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+ if (is_earlier(it->second.expires, now)) {
+ hls_zombies.erase(it++);
+ } else {
+ ret.push_back(it->second);
+ ++it;
+ }
+ }
+ return ret;
+}
+
void Server::do_work()
{
while (!should_stop()) {
for (unique_ptr<Stream> &stream : streams) {
serialized.add_streams()->MergeFrom(stream->serialize());
}
+ for (const auto &key_and_zombie : hls_zombies) {
+ HLSZombieProto *proto = serialized.add_hls_zombies();
+ proto->set_key(key_and_zombie.first);
+
+ const HLSZombie &zombie = key_and_zombie.second;
+ proto->set_remote_addr(zombie.remote_addr);
+ proto->set_url(zombie.url);
+ proto->set_referer(zombie.referer);
+ proto->set_user_agent(zombie.user_agent);
+ proto->set_expires_sec(zombie.expires.tv_sec);
+ proto->set_expires_nsec(zombie.expires.tv_nsec);
+ }
return serialized;
}
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
-
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+ lock_guard<mutex> lock(mu);
+ HLSZombie zombie;
+ zombie.remote_addr = zombie_proto.remote_addr();
+ zombie.url = zombie_proto.url();
+ zombie.referer = zombie_proto.referer();
+ zombie.user_agent = zombie_proto.user_agent();
+ zombie.expires.tv_sec = zombie_proto.expires_sec();
+ zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+ hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
void Server::set_backlog_size(int stream_index, size_t new_size)
{
lock_guard<mutex> lock(mu);
}
Stream *stream = client->stream;
+ hls_zombies.erase(client->get_hls_zombie_key());
if (client->stream_pos == Client::STREAM_POS_AT_START) {
// Start sending from the beginning of the backlog.
client->stream_pos = min<size_t>(
assert(bytes_to_send <= stream->backlog_size);
if (bytes_to_send == 0) {
if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it.
+ // Add (or overwrite) a HLS zombie.
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ } else {
+ HLSZombie zombie;
+ zombie.remote_addr = client->remote_addr;
+ zombie.referer = client->referer;
+ zombie.user_agent = client->user_agent;
+ zombie.url = client->stream->url + "?frag=<idle>";
+ zombie.expires = now;
+ zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+ hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+ }
if (more_requests(client)) {
// We're done sending the fragment, but should keep on reading new requests.
goto read_request_again;
if (user_agent_it != headers.end()) {
client->user_agent = user_agent_it->second;
}
+ const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+ if (x_playback_session_id_it != headers.end()) {
+ client->x_playback_session_id = x_playback_session_id_it->second;
+ } else {
+ client->x_playback_session_id.clear();
+ }
vector<string> request_tokens = split_tokens(lines[0]);
if (request_tokens.size() < 3) {