#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
}
}
-CubemapStateProto Server::serialize()
+CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short_response_pool)
{
// We don't serialize anything queued, so empty the queues.
process_queued_data();
CubemapStateProto serialized;
for (const auto &fd_and_client : clients) {
- serialized.add_clients()->MergeFrom(fd_and_client.second.serialize());
+ serialized.add_clients()->MergeFrom(fd_and_client.second.serialize(short_response_pool));
}
for (unique_ptr<Stream> &stream : streams) {
serialized.add_streams()->MergeFrom(stream->serialize());
process_client(client_ptr);
}
-void Server::add_client_from_serialized(const ClientProto &client)
+void Server::add_client_from_serialized(const ClientProto &client, const vector<shared_ptr<const string>> &short_responses)
{
lock_guard<mutex> lock(mu);
Stream *stream;
} else {
stream = streams[stream_index].get();
}
- auto inserted = clients.insert(make_pair(client.sock(), Client(client, stream)));
+ auto inserted = clients.insert(make_pair(client.sock(), Client(client, short_responses, stream)));
assert(inserted.second == true); // Should not already exist.
Client *client_ptr = &inserted.first->second;
int Server::lookup_stream_by_url(const string &url) const
{
- map<string, int>::const_iterator stream_url_it = stream_url_map.find(url);
+ const auto stream_url_it = stream_url_map.find(url);
if (stream_url_it == stream_url_map.end()) {
return -1;
}
return stream_url_it->second;
}
-int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
+int Server::add_stream(const string &url,
+ const string &hls_url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const string &allow_origin)
{
lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(url, streams.size()));
- streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
+ if (!hls_url.empty()) {
+ stream_hls_url_map.insert(make_pair(hls_url, streams.size()));
+ }
+ streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin));
return streams.size() - 1;
}
{
lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(stream.url(), streams.size()));
+ // stream_hls_url_map will be updated in register_hls_url(), since it is not part
+ // of the serialized state (it will always be picked out from the configuration).
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->src_encoding = encoding;
}
+
+void Server::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->hls_frag_duration = hls_frag_duration;
+}
+
+void Server::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ assert(hls_backlog_margin >= 0);
+ assert(hls_backlog_margin < streams[stream_index]->backlog_size);
+ streams[stream_index]->hls_backlog_margin = hls_backlog_margin;
+}
+
+void Server::set_allow_origin(int stream_index, const string &allow_origin)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->allow_origin = allow_origin;
+}
+
+void Server::register_hls_url(int stream_index, const string &hls_url)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ assert(!hls_url.empty());
+ stream_hls_url_map.insert(make_pair(hls_url, stream_index));
+}
void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->http_header = http_header;
+ Stream *stream = streams[stream_index].get();
+ stream->http_header = http_header;
- if (stream_header != streams[stream_index]->stream_header) {
+ if (stream_header != stream->stream_header) {
// We cannot start at any of the older starting points anymore,
// since they'd get the wrong header for the stream (not to mention
// that a changed header probably means the stream restarted,
// stop playing properly at the change point). Next block
// should be a suitable starting point (if not, something is
// pretty strange), so it will fill up again soon enough.
- streams[stream_index]->suitable_starting_points.clear();
+ stream->suitable_starting_points.clear();
+
+ if (!stream->fragments.empty()) {
+ stream->fragments.clear();
+ ++stream->discontinuity_counter;
+ stream->clear_hls_playlist_cache();
+ }
}
- streams[stream_index]->stream_header = stream_header;
+ stream->stream_header = stream_header;
}
void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
tls_server_contexts.insert(make_pair(acceptor, server_context));
}
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->add_data_deferred(data, bytes, metacube_flags);
+ streams[stream_index]->add_data_deferred(data, bytes, metacube_flags, pts);
}
// See the .h file for postconditions after this function.
int error_code = parse_request(client);
if (error_code == 200) {
- construct_header(client);
+ if (client->serving_hls_playlist) {
+ construct_hls_playlist(client);
+ } else {
+ construct_stream_header(client);
+ }
} else if (error_code == 204) {
construct_204(client);
} else {
int ret;
do {
ret = write(client->sock,
- client->header_or_short_response.data() + client->header_or_short_response_bytes_sent,
- client->header_or_short_response.size() - client->header_or_short_response_bytes_sent);
+ client->header_or_short_response->data() + client->header_or_short_response_bytes_sent,
+ client->header_or_short_response->size() - client->header_or_short_response_bytes_sent);
} while (ret == -1 && errno == EINTR);
if (ret == -1 && errno == EAGAIN) {
}
client->header_or_short_response_bytes_sent += ret;
- assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response.size());
+ assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response->size());
- if (client->header_or_short_response_bytes_sent < client->header_or_short_response.size()) {
+ if (client->header_or_short_response_bytes_sent < client->header_or_short_response->size()) {
// We haven't sent all yet. Fine; go another round.
goto sending_header_or_short_response_again;
}
// We're done sending the header or error! Clear it to release some memory.
- client->header_or_short_response.clear();
+ client->header_or_short_response = nullptr;
+ client->header_or_short_response_holder.clear();
+ client->header_or_short_response_ref.reset();
if (client->state == Client::SENDING_SHORT_RESPONSE) {
if (more_requests(client)) {
// Parse the headers, for logging purposes.
// TODO: Case-insensitivity.
- multimap<string, string> headers = extract_headers(lines, client->remote_addr);
- multimap<string, string>::const_iterator referer_it = headers.find("Referer");
+ unordered_multimap<string, string> headers = extract_headers(lines, client->remote_addr);
+ const auto referer_it = headers.find("Referer");
if (referer_it != headers.end()) {
client->referer = referer_it->second;
}
- multimap<string, string>::const_iterator user_agent_it = headers.find("User-Agent");
+ const auto user_agent_it = headers.find("User-Agent");
if (user_agent_it != headers.end()) {
client->user_agent = user_agent_it->second;
}
client->close_after_response = true;
client->http_11 = false;
} else {
- multimap<string, string>::const_iterator connection_it = headers.find("Connection");
+ const auto connection_it = headers.find("Connection");
if (connection_it != headers.end() && connection_it->second == "close") {
client->close_after_response = true;
}
}
- map<string, int>::const_iterator stream_url_map_it = stream_url_map.find(url);
- if (stream_url_map_it == stream_url_map.end()) {
- map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
- if (ping_url_map_it == ping_url_map.end()) {
- return 404; // Not found.
+ const auto stream_url_map_it = stream_url_map.find(url);
+ if (stream_url_map_it != stream_url_map.end()) {
+ // Serve a regular stream..
+ client->stream = streams[stream_url_map_it->second].get();
+ client->serving_hls_playlist = false;
+ } else {
+ const auto stream_hls_url_map_it = stream_hls_url_map.find(url);
+ if (stream_hls_url_map_it != stream_hls_url_map.end()) {
+ // Serve HLS playlist.
+ client->stream = streams[stream_hls_url_map_it->second].get();
+ client->serving_hls_playlist = true;
} else {
- return 204; // No error.
+ const auto ping_url_map_it = ping_url_map.find(url);
+ if (ping_url_map_it == ping_url_map.end()) {
+ return 404; // Not found.
+ } else {
+ // Serve a ping (204 no error).
+ return 204;
+ }
}
}
- Stream *stream = streams[stream_url_map_it->second].get();
+ Stream *stream = client->stream;
if (stream->http_header.empty()) {
return 503; // Service unavailable.
}
+ if (client->serving_hls_playlist) {
+ if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+ // This doesn't make any sense, and is hard to implement, too.
+ return 404;
+ } else {
+ return 200;
+ }
+ }
+
if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
// This stream won't end, so we don't have a content-length,
// and can just as well tell the client it's Connection: close
return 200; // OK!
}
-void Server::construct_header(Client *client)
+void Server::construct_stream_header(Client *client)
{
Stream *stream = client->stream;
- client->header_or_short_response = stream->http_header;
+ string response = stream->http_header;
if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
char buf[64];
snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", stream->stream_header.size());
- client->header_or_short_response.append(buf);
+ response.append(buf);
} else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
char buf[64];
snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", client->stream_pos_end - client->stream_pos);
- client->header_or_short_response.append(buf);
+ response.append(buf);
}
if (client->http_11) {
- assert(client->header_or_short_response.find("HTTP/1.0") == 0);
- client->header_or_short_response[7] = '1'; // Change to HTTP/1.1.
+ assert(response.find("HTTP/1.0") == 0);
+ response[7] = '1'; // Change to HTTP/1.1.
if (client->close_after_response) {
- client->header_or_short_response.append("Connection: close\r\n");
+ response.append("Connection: close\r\n");
}
} else {
assert(client->close_after_response);
}
+ if (!stream->allow_origin.empty()) {
+ response.append("Access-Control-Allow-Origin: ");
+ response.append(stream->allow_origin);
+ response.append("\r\n");
+ }
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
- client->header_or_short_response.append("\r\n");
+ response.append("\r\n");
} else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
- client->header_or_short_response.append(
- "Content-encoding: metacube\r\n"
- "\r\n");
+ response.append("Content-encoding: metacube\r\n\r\n");
if (!stream->stream_header.empty()) {
metacube2_block_header hdr;
memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
hdr.size = htonl(stream->stream_header.size());
hdr.flags = htons(METACUBE_FLAGS_HEADER);
hdr.csum = htons(metacube2_compute_crc(&hdr));
- client->header_or_short_response.append(
- string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+ response.append(string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
}
} else {
assert(false);
}
if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
client->state = Client::SENDING_SHORT_RESPONSE;
- client->header_or_short_response.append(stream->stream_header);
+ response.append(stream->stream_header);
} else {
client->state = Client::SENDING_HEADER;
if (client->stream_pos_end == Client::STREAM_POS_NO_END) { // Fragments don't contain stream headers.
- client->header_or_short_response.append(stream->stream_header);
+ response.append(stream->stream_header);
}
}
+ client->header_or_short_response_holder = move(response);
+ client->header_or_short_response = &client->header_or_short_response_holder;
+
// Switch states.
change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
}
"HTTP/1.%d %d Error\r\nContent-type: text/plain\r\nContent-length: 30\r\n\r\nSomething went wrong. Sorry.\r\n",
client->http_11, error_code);
}
- client->header_or_short_response = error;
+ client->header_or_short_response_holder = error;
+ client->header_or_short_response = &client->header_or_short_response_holder;
+
+ // Switch states.
+ client->state = Client::SENDING_SHORT_RESPONSE;
+ change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
+}
+
+void Server::construct_hls_playlist(Client *client)
+{
+ Stream *stream = client->stream;
+ shared_ptr<const string> *cache;
+ if (client->http_11) {
+ if (client->close_after_response) {
+ cache = &stream->hls_playlist_http11_close;
+ } else {
+ cache = &stream->hls_playlist_http11_persistent;
+ }
+ } else {
+ assert(client->close_after_response);
+ cache = &stream->hls_playlist_http10;
+ }
+
+ if (*cache == nullptr) {
+ *cache = stream->generate_hls_playlist(client->http_11, client->close_after_response);
+ }
+ client->header_or_short_response_ref = *cache;
+ client->header_or_short_response = cache->get();
// Switch states.
client->state = Client::SENDING_SHORT_RESPONSE;
void Server::construct_204(Client *client)
{
- map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
+ const auto ping_url_map_it = ping_url_map.find(client->url);
assert(ping_url_map_it != ping_url_map.end());
+ string response;
if (client->http_11) {
- client->header_or_short_response = "HTTP/1.1 204 No Content\r\n";
+ response = "HTTP/1.1 204 No Content\r\n";
if (client->close_after_response) {
- client->header_or_short_response.append("Connection: close\r\n");
+ response.append("Connection: close\r\n");
}
} else {
- client->header_or_short_response = "HTTP/1.0 204 No Content\r\n";
+ response = "HTTP/1.0 204 No Content\r\n";
assert(client->close_after_response);
}
if (!ping_url_map_it->second.empty()) {
- client->header_or_short_response.append("Access-Control-Allow-Origin: ");
- client->header_or_short_response.append(ping_url_map_it->second);
+ response.append("Access-Control-Allow-Origin: ");
+ response.append(ping_url_map_it->second);
+ response.append("\r\n");
}
- client->header_or_short_response.append("\r\n");
+ response.append("\r\n");
+
+ client->header_or_short_response_holder = move(response);
+ client->header_or_short_response = &client->header_or_short_response_holder;
// Switch states.
client->state = Client::SENDING_SHORT_RESPONSE;
return false;
}
+ // Log to access_log.
+ access_log->write(client->get_stats());
+
+ // Flush pending data; does not cancel out TCP_CORK (since that still takes priority),
+ // but does a one-off flush.
+ int one = 1;
+ if (setsockopt(client->sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) == -1) {
+ log_perror("setsockopt(TCP_NODELAY)");
+ // Can still continue.
+ }
+
// Switch states and reset the parsers. We don't reset statistics.
client->state = Client::READING_REQUEST;
client->url.clear();
client->stream = NULL;
- client->header_or_short_response.clear();
+ client->header_or_short_response = nullptr;
+ client->header_or_short_response_holder.clear();
+ client->header_or_short_response_ref.reset();
client->header_or_short_response_bytes_sent = 0;
change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP); // No TLS handshake, so no EPOLLOUT needed.