return ret;
}
+vector<HLSZombie> Server::get_hls_zombies()
+{
+ vector<HLSZombie> ret;
+
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ return ret;
+ }
+
+ lock_guard<mutex> lock(mu);
+ for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+ if (is_earlier(it->second.expires, now)) {
+ hls_zombies.erase(it++);
+ } else {
+ ret.push_back(it->second);
+ ++it;
+ }
+ }
+ return ret;
+}
+
void Server::do_work()
{
while (!should_stop()) {
for (unique_ptr<Stream> &stream : streams) {
serialized.add_streams()->MergeFrom(stream->serialize());
}
+ for (const auto &key_and_zombie : hls_zombies) {
+ HLSZombieProto *proto = serialized.add_hls_zombies();
+ proto->set_key(key_and_zombie.first);
+
+ const HLSZombie &zombie = key_and_zombie.second;
+ proto->set_remote_addr(zombie.remote_addr);
+ proto->set_url(zombie.url);
+ proto->set_referer(zombie.referer);
+ proto->set_user_agent(zombie.user_agent);
+ proto->set_expires_sec(zombie.expires.tv_sec);
+ proto->set_expires_nsec(zombie.expires.tv_nsec);
+ }
return serialized;
}
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
-
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+ lock_guard<mutex> lock(mu);
+ HLSZombie zombie;
+ zombie.remote_addr = zombie_proto.remote_addr();
+ zombie.url = zombie_proto.url();
+ zombie.referer = zombie_proto.referer();
+ zombie.user_agent = zombie_proto.user_agent();
+ zombie.expires.tv_sec = zombie_proto.expires_sec();
+ zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+ hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
void Server::set_backlog_size(int stream_index, size_t new_size)
{
lock_guard<mutex> lock(mu);
{
switch (client->state) {
case Client::READING_REQUEST: {
- if (client->tls_context != nullptr) {
+ if (client->tls_context != nullptr && !client->in_ktls_mode) {
if (send_pending_tls_data(client)) {
// send_pending_tls_data() hit postconditions #1 or #4.
return;
// Try to read more of the request.
char buf[1024];
int ret;
- if (client->tls_context == nullptr) {
- ret = read_nontls_data(client, buf, sizeof(buf));
+ if (client->tls_context == nullptr || client->in_ktls_mode) {
+ ret = read_plain_data(client, buf, sizeof(buf));
if (ret == -1) {
- // read_nontls_data() hit postconditions #1 or #2.
+ // read_plain_data() hit postconditions #1 or #2.
return;
}
} else {
assert(status == RP_FINISHED);
- if (client->tls_context && !client->in_ktls_mode && tls_established(client->tls_context)) {
- // We're ready to enter kTLS mode, unless we still have some
- // handshake data to send (which then must be sent as non-kTLS).
- if (send_pending_tls_data(client)) {
- // send_pending_tls_data() hit postconditions #1 or #4.
- return;
- }
- ret = tls_make_ktls(client->tls_context, client->sock);
- if (ret < 0) {
- log_tls_error("tls_make_ktls", ret);
- close_client(client);
- return;
- }
- client->in_ktls_mode = true;
- }
-
int error_code = parse_request(client);
if (error_code == 200) {
if (client->serving_hls_playlist) {
}
Stream *stream = client->stream;
+ hls_zombies.erase(client->get_hls_zombie_key());
if (client->stream_pos == Client::STREAM_POS_AT_START) {
// Start sending from the beginning of the backlog.
client->stream_pos = min<size_t>(
// 100 kB prebuffer but end up sending a 10 MB GOP.
assert(client->stream_pos == Client::STREAM_POS_AT_END);
assert(client->stream_pos_end == Client::STREAM_POS_NO_END);
- deque<size_t>::const_iterator starting_point_it =
+ deque<uint64_t>::const_iterator starting_point_it =
lower_bound(stream->suitable_starting_points.begin(),
stream->suitable_starting_points.end(),
stream->bytes_received - stream->prebuffering_bytes);
assert(bytes_to_send <= stream->backlog_size);
if (bytes_to_send == 0) {
if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it.
+ // Add (or overwrite) a HLS zombie.
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ } else {
+ HLSZombie zombie;
+ zombie.remote_addr = client->remote_addr;
+ zombie.referer = client->referer;
+ zombie.user_agent = client->user_agent;
+ zombie.url = client->stream->url + "?frag=<idle>";
+ zombie.expires = now;
+ zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+ hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+ }
if (more_requests(client)) {
// We're done sending the fragment, but should keep on reading new requests.
goto read_request_again;
goto send_data_again;
}
-int Server::read_nontls_data(Client *client, char *buf, size_t max_size)
+int Server::read_plain_data(Client *client, char *buf, size_t max_size)
{
int ret;
do {
int Server::read_tls_data(Client *client, char *buf, size_t max_size)
{
read_again:
+ assert(!client->in_ktls_mode);
+
int ret;
do {
ret = read(client->sock, buf, max_size);
return -1;
}
+ if (tls_established(client->tls_context)) {
+ // We're ready to enter kTLS mode, unless we still have some
+ // handshake data to send (which then must be sent as non-kTLS).
+ if (send_pending_tls_data(client)) {
+ // send_pending_tls_data() hit postconditions #1 or #4.
+ return -1;
+ }
+ int err = tls_make_ktls(client->tls_context, client->sock); // Don't overwrite ret.
+ if (err < 0) {
+ log_tls_error("tls_make_ktls", ret);
+ close_client(client);
+ return -1;
+ }
+ client->in_ktls_mode = true;
+ }
+
assert(ret > 0);
return ret;
}
if (user_agent_it != headers.end()) {
client->user_agent = user_agent_it->second;
}
+ const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+ if (x_playback_session_id_it != headers.end()) {
+ client->x_playback_session_id = x_playback_session_id_it->second;
+ } else {
+ client->x_playback_session_id.clear();
+ }
vector<string> request_tokens = split_tokens(lines[0]);
if (request_tokens.size() < 3) {