#include <assert.h>
#include <errno.h>
+#include <inttypes.h>
+#include <limits.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <pthread.h>
#include <utility>
#include <vector>
+#include "ktls.h"
#include "tlse.h"
#include "acceptor.h"
Server::Server()
{
- epoll_fd = epoll_create(1024); // Size argument is ignored.
+ epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (epoll_fd == -1) {
log_perror("epoll_fd");
exit(1);
Server::~Server()
{
safe_close(epoll_fd);
+
+ // We're going to die soon anyway, but clean this up to keep leak checking happy.
+ for (const auto &acceptor_and_context : tls_server_contexts) {
+ tls_destroy_context(acceptor_and_context.second);
+ }
}
vector<ClientStats> Server::get_client_stats() const
return ret;
}
+vector<HLSZombie> Server::get_hls_zombies()
+{
+ vector<HLSZombie> ret;
+
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ return ret;
+ }
+
+ lock_guard<mutex> lock(mu);
+ for (auto it = hls_zombies.begin(); it != hls_zombies.end(); ) {
+ if (is_earlier(it->second.expires, now)) {
+ hls_zombies.erase(it++);
+ } else {
+ ret.push_back(it->second);
+ ++it;
+ }
+ }
+ return ret;
+}
+
void Server::do_work()
{
while (!should_stop()) {
for (unique_ptr<Stream> &stream : streams) {
serialized.add_streams()->MergeFrom(stream->serialize());
}
+ for (const auto &key_and_zombie : hls_zombies) {
+ HLSZombieProto *proto = serialized.add_hls_zombies();
+ proto->set_key(key_and_zombie.first);
+
+ const HLSZombie &zombie = key_and_zombie.second;
+ proto->set_remote_addr(zombie.remote_addr);
+ proto->set_url(zombie.url);
+ proto->set_referer(zombie.referer);
+ proto->set_user_agent(zombie.user_agent);
+ proto->set_expires_sec(zombie.expires.tv_sec);
+ proto->set_expires_nsec(zombie.expires.tv_nsec);
+ }
return serialized;
}
assert(inserted.second == true); // Should not already exist.
Client *client_ptr = &inserted.first->second;
- // Connection timestamps must be nondecreasing. I can't find any guarantee
- // that even the monotonic clock can't go backwards by a small amount
- // (think switching between CPUs with non-synchronized TSCs), so if
- // this actually should happen, we hack around it by fudging
- // connect_time.
- if (!clients_ordered_by_connect_time.empty() &&
- is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first)) {
- client_ptr->connect_time = clients_ordered_by_connect_time.back().first;
- }
- clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, sock));
+ start_client_timeout_timer(client_ptr);
// Start listening on data from this socket.
epoll_event ev;
}
}
+void Server::start_client_timeout_timer(Client *client)
+{
+ // Connection timestamps must be nondecreasing. I can't find any guarantee
+ // that even the monotonic clock can't go backwards by a small amount
+ // (think switching between CPUs with non-synchronized TSCs), so if
+ // this actually should happen, we hack around it by fudging
+ // connect_time.
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &client->connect_time) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ } else {
+ if (!clients_ordered_by_connect_time.empty() &&
+ is_earlier(client->connect_time, clients_ordered_by_connect_time.back().first)) {
+ client->connect_time = clients_ordered_by_connect_time.back().first;
+ }
+ clients_ordered_by_connect_time.push(make_pair(client->connect_time, client->sock));
+ }
+}
+
int Server::lookup_stream_by_url(const string &url) const
{
const auto stream_url_it = stream_url_map.find(url);
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
-
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+ lock_guard<mutex> lock(mu);
+ HLSZombie zombie;
+ zombie.remote_addr = zombie_proto.remote_addr();
+ zombie.url = zombie_proto.url();
+ zombie.referer = zombie_proto.referer();
+ zombie.user_agent = zombie_proto.user_agent();
+ zombie.expires.tv_sec = zombie_proto.expires_sec();
+ zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+ hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
void Server::set_backlog_size(int stream_index, size_t new_size)
{
lock_guard<mutex> lock(mu);
{
lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- assert(hls_backlog_margin >= 0);
assert(hls_backlog_margin < streams[stream_index]->backlog_size);
streams[stream_index]->hls_backlog_margin = hls_backlog_margin;
}
{
lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- Stream *stream = streams[stream_index].get();
- stream->http_header = http_header;
-
- if (stream_header != stream->stream_header) {
- // We cannot start at any of the older starting points anymore,
- // since they'd get the wrong header for the stream (not to mention
- // that a changed header probably means the stream restarted,
- // which means any client starting on the old one would probably
- // stop playing properly at the change point). Next block
- // should be a suitable starting point (if not, something is
- // pretty strange), so it will fill up again soon enough.
- stream->suitable_starting_points.clear();
-
- if (!stream->fragments.empty()) {
- stream->fragments.clear();
- ++stream->discontinuity_counter;
- stream->clear_hls_playlist_cache();
- }
- }
- stream->stream_header = stream_header;
+ streams[stream_index]->set_header(http_header, stream_header);
+}
+
+void Server::set_unavailable(int stream_index)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->set_unavailable();
}
void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
{
switch (client->state) {
case Client::READING_REQUEST: {
- if (client->tls_context != nullptr) {
+ if (client->tls_context != nullptr && !client->in_ktls_mode) {
if (send_pending_tls_data(client)) {
// send_pending_tls_data() hit postconditions #1 or #4.
return;
// Try to read more of the request.
char buf[1024];
int ret;
- if (client->tls_context == nullptr) {
- ret = read_nontls_data(client, buf, sizeof(buf));
+ if (client->tls_context == nullptr || client->in_ktls_mode) {
+ ret = read_plain_data(client, buf, sizeof(buf));
if (ret == -1) {
- // read_nontls_data() hit postconditions #1 or #2.
+ // read_plain_data() hit postconditions #1 or #2.
return;
}
} else {
assert(status == RP_FINISHED);
- if (client->tls_context && !client->in_ktls_mode && tls_established(client->tls_context)) {
- // We're ready to enter kTLS mode, unless we still have some
- // handshake data to send (which then must be sent as non-kTLS).
- if (send_pending_tls_data(client)) {
- // send_pending_tls_data() hit postconditions #1 or #4.
- return;
- }
- ret = tls_make_ktls(client->tls_context, client->sock);
- if (ret < 0) {
- log_tls_error("tls_make_ktls", ret);
- close_client(client);
- return;
- }
- client->in_ktls_mode = true;
- }
-
int error_code = parse_request(client);
if (error_code == 200) {
if (client->serving_hls_playlist) {
}
Stream *stream = client->stream;
+ hls_zombies.erase(client->get_hls_zombie_key());
if (client->stream_pos == Client::STREAM_POS_AT_START) {
// Start sending from the beginning of the backlog.
client->stream_pos = min<size_t>(
} else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
// We're sending a fragment, and should have all of it,
// so start sending right away.
- assert(client->stream_pos >= 0);
+ assert(ssize_t(client->stream_pos) >= 0);
client->state = Client::SENDING_DATA;
goto sending_data;
} else if (stream->prebuffering_bytes == 0) {
// 100 kB prebuffer but end up sending a 10 MB GOP.
assert(client->stream_pos == Client::STREAM_POS_AT_END);
assert(client->stream_pos_end == Client::STREAM_POS_NO_END);
- deque<size_t>::const_iterator starting_point_it =
+ deque<uint64_t>::const_iterator starting_point_it =
lower_bound(stream->suitable_starting_points.begin(),
stream->suitable_starting_points.end(),
stream->bytes_received - stream->prebuffering_bytes);
assert(bytes_to_send <= stream->backlog_size);
if (bytes_to_send == 0) {
if (client->stream_pos == client->stream_pos_end) { // We have a definite end, and we're at it.
+ // Add (or overwrite) a HLS zombie.
+ timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ } else {
+ HLSZombie zombie;
+ zombie.remote_addr = client->remote_addr;
+ zombie.referer = client->referer;
+ zombie.user_agent = client->user_agent;
+ zombie.url = client->stream->url + "?frag=<idle>";
+ zombie.expires = now;
+ zombie.expires.tv_sec += client->stream->hls_frag_duration * 3;
+ hls_zombies[client->get_hls_zombie_key()] = move(zombie);
+ }
if (more_requests(client)) {
// We're done sending the fragment, but should keep on reading new requests.
goto read_request_again;
}
}
+namespace {
+
+void flush_pending_data(int sock)
+{
+ // Flush pending data, which would otherwise wait for the 200ms TCP_CORK timer
+ // to elapsed; does not cancel out TCP_CORK (since that still takes priority),
+ // but does a one-off flush.
+ int one = 1;
+ if (setsockopt(sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) == -1) {
+ log_perror("setsockopt(TCP_NODELAY)");
+ // Can still continue.
+ }
+}
+
+} // namespace
+
bool Server::send_pending_tls_data(Client *client)
{
// See if there's data from the TLS library to write.
return true;
}
if (ret > 0 && size_t(ret) == client->tls_data_left_to_send) {
- // All data has been sent, so we don't need to go to sleep.
+ // All data has been sent, so we don't need to go to sleep
+ // (although we are likely to do so immediately afterwards,
+ // due to lack of client data).
tls_buffer_clear(client->tls_context);
client->tls_data_to_send = nullptr;
+
+ // Flush the data we just wrote, since the client probably
+ // is waiting for it.
+ flush_pending_data(client->sock);
return false;
}
goto send_data_again;
}
-int Server::read_nontls_data(Client *client, char *buf, size_t max_size)
+int Server::read_plain_data(Client *client, char *buf, size_t max_size)
{
int ret;
do {
int Server::read_tls_data(Client *client, char *buf, size_t max_size)
{
read_again:
+ assert(!client->in_ktls_mode);
+
int ret;
do {
ret = read(client->sock, buf, max_size);
return -1;
}
+ if (tls_established(client->tls_context)) {
+ // We're ready to enter kTLS mode, unless we still have some
+ // handshake data to send (which then must be sent as non-kTLS).
+ if (send_pending_tls_data(client)) {
+ // send_pending_tls_data() hit postconditions #1 or #4.
+ return -1;
+ }
+ int err = tls_make_ktls(client->tls_context, client->sock); // Don't overwrite ret.
+ if (err < 0) {
+ log_tls_error("tls_make_ktls", ret);
+ close_client(client);
+ return -1;
+ }
+ client->in_ktls_mode = true;
+ }
+
assert(ret > 0);
return ret;
}
if (!client->close_after_response) {
assert(client->stream_pos_end != Client::STREAM_POS_NO_END);
- // We've already sent a Content-length, so we can't just skip data.
+ // We've already sent a Content-Length, so we can't just skip data.
// Close the connection immediately and hope the other side
// is able to figure out that there was an error and it needs to skip.
client->close_after_response = true;
}
// Parse the headers, for logging purposes.
- // TODO: Case-insensitivity.
- unordered_multimap<string, string> headers = extract_headers(lines, client->remote_addr);
+ HTTPHeaderMultimap headers = extract_headers(lines, client->remote_addr);
const auto referer_it = headers.find("Referer");
if (referer_it != headers.end()) {
client->referer = referer_it->second;
if (user_agent_it != headers.end()) {
client->user_agent = user_agent_it->second;
}
+ const auto x_playback_session_id_it = headers.find("X-Playback-Session-Id");
+ if (x_playback_session_id_it != headers.end()) {
+ client->x_playback_session_id = x_playback_session_id_it->second;
+ } else {
+ client->x_playback_session_id.clear();
+ }
vector<string> request_tokens = split_tokens(lines[0]);
if (request_tokens.size() < 3) {
}
Stream *stream = client->stream;
- if (stream->http_header.empty()) {
- return 503; // Service unavailable.
- }
if (client->serving_hls_playlist) {
if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
}
if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
+ if (stream->unavailable) {
+ return 503; // Service unavailable.
+ }
+
// This stream won't end, so we don't have a content-length,
// and can just as well tell the client it's Connection: close
// (otherwise, we'd have to implement chunking TE for no good reason).
string response = stream->http_header;
if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
char buf[64];
- snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", stream->stream_header.size());
+ snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n", stream->stream_header.size());
response.append(buf);
} else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
char buf[64];
- snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", client->stream_pos_end - client->stream_pos);
+ snprintf(buf, sizeof(buf), "Content-Length: %" PRIu64 "\r\n", client->stream_pos_end - client->stream_pos);
response.append(buf);
}
if (client->http_11) {
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
response.append("\r\n");
} else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
- response.append("Content-encoding: metacube\r\n\r\n");
+ response.append("Content-Encoding: metacube\r\n\r\n");
if (!stream->stream_header.empty()) {
metacube2_block_header hdr;
memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
char error[256];
if (client->http_11 && client->close_after_response) {
snprintf(error, sizeof(error),
- "HTTP/1.1 %d Error\r\nContent-type: text/plain\r\nConnection: close\r\n\r\nSomething went wrong. Sorry.\r\n",
+ "HTTP/1.1 %d Error\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nSomething went wrong. Sorry.\r\n",
error_code);
} else {
snprintf(error, sizeof(error),
- "HTTP/1.%d %d Error\r\nContent-type: text/plain\r\nContent-length: 30\r\n\r\nSomething went wrong. Sorry.\r\n",
+ "HTTP/1.%d %d Error\r\nContent-Type: text/plain\r\nContent-Length: 30\r\n\r\nSomething went wrong. Sorry.\r\n",
client->http_11, error_code);
}
client->header_or_short_response_holder = error;
change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
}
+namespace {
+
template<class T>
void delete_from(vector<T> *v, T elem)
{
typename vector<T>::iterator new_end = remove(v->begin(), v->end(), elem);
v->erase(new_end, v->end());
}
+
+void send_ktls_close(int sock)
+{
+ uint8_t record_type = 21; // Alert.
+ uint8_t body[] = {
+ 1, // Warning level (but still fatal!).
+ 0, // close_notify.
+ };
+
+ int cmsg_len = sizeof(record_type);
+ char buf[CMSG_SPACE(cmsg_len)];
+
+ msghdr msg = {0};
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_TLS;
+ cmsg->cmsg_type = TLS_SET_RECORD_TYPE;
+ cmsg->cmsg_len = CMSG_LEN(cmsg_len);
+ *CMSG_DATA(cmsg) = record_type;
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ iovec msg_iov;
+ msg_iov.iov_base = body;
+ msg_iov.iov_len = sizeof(body);
+ msg.msg_iov = &msg_iov;
+ msg.msg_iovlen = 1;
+
+ int err;
+ do {
+ err = sendmsg(sock, &msg, 0);
+ } while (err == -1 && errno == EINTR); // Ignore all other errors.
+}
+
+} // namespace
void Server::close_client(Client *client)
{
}
if (client->tls_context) {
+ if (client->in_ktls_mode) {
+ // Keep GnuTLS happy.
+ send_ktls_close(client->sock);
+ }
tls_destroy_context(client->tls_context);
}
// Log to access_log.
access_log->write(client->get_stats());
- // Flush pending data; does not cancel out TCP_CORK (since that still takes priority),
- // but does a one-off flush.
- int one = 1;
- if (setsockopt(client->sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) == -1) {
- log_perror("setsockopt(TCP_NODELAY)");
- // Can still continue.
- }
+ flush_pending_data(client->sock);
// Switch states and reset the parsers. We don't reset statistics.
client->state = Client::READING_REQUEST;
client->header_or_short_response_holder.clear();
client->header_or_short_response_ref.reset();
client->header_or_short_response_bytes_sent = 0;
+ client->bytes_sent = 0;
+ start_client_timeout_timer(client);
change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP); // No TLS handshake, so no EPOLLOUT needed.