#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
int Server::lookup_stream_by_url(const string &url) const
{
- map<string, int>::const_iterator stream_url_it = stream_url_map.find(url);
+ const auto stream_url_it = stream_url_map.find(url);
if (stream_url_it == stream_url_map.end()) {
return -1;
}
return stream_url_it->second;
}
-int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
+int Server::add_stream(const string &url,
+ const string &hls_url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const string &allow_origin)
{
lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(url, streams.size()));
- streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
+ if (!hls_url.empty()) {
+ stream_hls_url_map.insert(make_pair(hls_url, streams.size()));
+ }
+ streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin));
return streams.size() - 1;
}
{
lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(stream.url(), streams.size()));
+ // stream_hls_url_map will be updated in register_hls_url(), since it is not part
+ // of the serialized state (it will always be picked out from the configuration).
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->src_encoding = encoding;
}
+
+void Server::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->hls_frag_duration = hls_frag_duration;
+}
+
+void Server::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ assert(hls_backlog_margin >= 0);
+ assert(hls_backlog_margin < streams[stream_index]->backlog_size);
+ streams[stream_index]->hls_backlog_margin = hls_backlog_margin;
+}
+
+void Server::set_allow_origin(int stream_index, const string &allow_origin)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->allow_origin = allow_origin;
+}
+
+void Server::register_hls_url(int stream_index, const string &hls_url)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ assert(!hls_url.empty());
+ stream_hls_url_map.insert(make_pair(hls_url, stream_index));
+}
void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->http_header = http_header;
+ Stream *stream = streams[stream_index].get();
+ stream->http_header = http_header;
- if (stream_header != streams[stream_index]->stream_header) {
+ if (stream_header != stream->stream_header) {
// We cannot start at any of the older starting points anymore,
// since they'd get the wrong header for the stream (not to mention
// that a changed header probably means the stream restarted,
// stop playing properly at the change point). Next block
// should be a suitable starting point (if not, something is
// pretty strange), so it will fill up again soon enough.
- streams[stream_index]->suitable_starting_points.clear();
+ stream->suitable_starting_points.clear();
+
+ if (!stream->fragments.empty()) {
+ stream->fragments.clear();
+ ++stream->discontinuity_counter;
+ stream->clear_hls_playlist_cache();
+ }
}
- streams[stream_index]->stream_header = stream_header;
+ stream->stream_header = stream_header;
}
void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
tls_server_contexts.insert(make_pair(acceptor, server_context));
}
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->add_data_deferred(data, bytes, metacube_flags);
+ streams[stream_index]->add_data_deferred(data, bytes, metacube_flags, pts);
}
// See the .h file for postconditions after this function.
int error_code = parse_request(client);
if (error_code == 200) {
- construct_header(client);
+ if (client->serving_hls_playlist) {
+ construct_hls_playlist(client);
+ } else {
+ construct_stream_header(client);
+ }
} else if (error_code == 204) {
construct_204(client);
} else {
// Parse the headers, for logging purposes.
// TODO: Case-insensitivity.
- multimap<string, string> headers = extract_headers(lines, client->remote_addr);
- multimap<string, string>::const_iterator referer_it = headers.find("Referer");
+ unordered_multimap<string, string> headers = extract_headers(lines, client->remote_addr);
+ const auto referer_it = headers.find("Referer");
if (referer_it != headers.end()) {
client->referer = referer_it->second;
}
- multimap<string, string>::const_iterator user_agent_it = headers.find("User-Agent");
+ const auto user_agent_it = headers.find("User-Agent");
if (user_agent_it != headers.end()) {
client->user_agent = user_agent_it->second;
}
client->close_after_response = true;
client->http_11 = false;
} else {
- multimap<string, string>::const_iterator connection_it = headers.find("Connection");
+ const auto connection_it = headers.find("Connection");
if (connection_it != headers.end() && connection_it->second == "close") {
client->close_after_response = true;
}
}
- map<string, int>::const_iterator stream_url_map_it = stream_url_map.find(url);
- if (stream_url_map_it == stream_url_map.end()) {
- map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
- if (ping_url_map_it == ping_url_map.end()) {
- return 404; // Not found.
+ const auto stream_url_map_it = stream_url_map.find(url);
+ if (stream_url_map_it != stream_url_map.end()) {
+ // Serve a regular stream..
+ client->stream = streams[stream_url_map_it->second].get();
+ client->serving_hls_playlist = false;
+ } else {
+ const auto stream_hls_url_map_it = stream_hls_url_map.find(url);
+ if (stream_hls_url_map_it != stream_hls_url_map.end()) {
+ // Serve HLS playlist.
+ client->stream = streams[stream_hls_url_map_it->second].get();
+ client->serving_hls_playlist = true;
} else {
- return 204; // No error.
+ const auto ping_url_map_it = ping_url_map.find(url);
+ if (ping_url_map_it == ping_url_map.end()) {
+ return 404; // Not found.
+ } else {
+ // Serve a ping (204 no error).
+ return 204;
+ }
}
}
- Stream *stream = streams[stream_url_map_it->second].get();
+ Stream *stream = client->stream;
if (stream->http_header.empty()) {
return 503; // Service unavailable.
}
+ if (client->serving_hls_playlist) {
+ if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+ // This doesn't make any sense, and is hard to implement, too.
+ return 404;
+ } else {
+ return 200;
+ }
+ }
+
if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
// This stream won't end, so we don't have a content-length,
// and can just as well tell the client it's Connection: close
return 200; // OK!
}
-void Server::construct_header(Client *client)
+void Server::construct_stream_header(Client *client)
{
Stream *stream = client->stream;
string response = stream->http_header;
} else {
assert(client->close_after_response);
}
+ if (!stream->allow_origin.empty()) {
+ response.append("Access-Control-Allow-Origin: ");
+ response.append(stream->allow_origin);
+ response.append("\r\n");
+ }
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
response.append("\r\n");
} else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
}
+void Server::construct_hls_playlist(Client *client)
+{
+ Stream *stream = client->stream;
+ shared_ptr<const string> *cache;
+ if (client->http_11) {
+ if (client->close_after_response) {
+ cache = &stream->hls_playlist_http11_close;
+ } else {
+ cache = &stream->hls_playlist_http11_persistent;
+ }
+ } else {
+ assert(client->close_after_response);
+ cache = &stream->hls_playlist_http10;
+ }
+
+ if (*cache == nullptr) {
+ *cache = stream->generate_hls_playlist(client->http_11, client->close_after_response);
+ }
+ client->header_or_short_response_ref = *cache;
+ client->header_or_short_response = cache->get();
+
+ // Switch states.
+ client->state = Client::SENDING_SHORT_RESPONSE;
+ change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
+}
+
void Server::construct_204(Client *client)
{
- map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
+ const auto ping_url_map_it = ping_url_map.find(client->url);
assert(ping_url_map_it != ping_url_map.end());
string response;
return false;
}
+ // Log to access_log.
+ access_log->write(client->get_stats());
+
+ // Flush pending data; does not cancel out TCP_CORK (since that still takes priority),
+ // but does a one-off flush.
+ int one = 1;
+ if (setsockopt(client->sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) == -1) {
+ log_perror("setsockopt(TCP_NODELAY)");
+ // Can still continue.
+ }
+
// Switch states and reset the parsers. We don't reset statistics.
client->state = Client::READING_REQUEST;
client->url.clear();