std::string url;
Stream *stream = nullptr;
+ // If true, we don't actually serve the stream, but its HLS playlist.
+ bool serving_hls_playlist = false;
+
// Whether we should close the connection after sending the response.
// Not relevant for READING_REQUEST. Must be true if http_11 is false.
bool close_after_response;
stream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8;
}
+ // Parse the HLS URL, if any.
+ const auto hls_url_it = line.parameters.find("hls_playlist");
+ if (hls_url_it != line.parameters.end()) {
+ stream.hls_url = hls_url_it->second;
+ if (stream.hls_url.empty()) {
+ log(ERROR, "Parameter 'hls_playlist' was given but empty");
+ return false;
+ }
+ if (stream.encoding == StreamConfig::STREAM_ENCODING_METACUBE) {
+ log(ERROR, "HLS cannot be used with Metacube output");
+ return false;
+ }
+ }
+
+ // Parse the HLS fragment duration, if any.
+ const auto hls_frag_duration_it = line.parameters.find("hls_frag_duration");
+ if (hls_frag_duration_it != line.parameters.end()) {
+ if (stream.hls_url.empty()) {
+ log(ERROR, "Parameter 'hls_frag_duration' given, but no 'hls_playlist' given");
+ return false;
+ }
+ stream.hls_frag_duration = stoi(hls_frag_duration_it->second);
+ if (stream.hls_frag_duration <= 0) {
+ log(ERROR, "'hls_frag_duration' must be a strictly positive integer");
+ return false;
+ }
+ }
+
+ // Parse the HLS backlog margin, if any.
+ const auto hls_backlog_margin_it = line.parameters.find("hls_backlog_margin");
+ if (hls_backlog_margin_it != line.parameters.end()) {
+ if (stream.hls_url.empty()) {
+ log(ERROR, "Parameter 'hls_backlog_margin' given, but no 'hls_playlist' given");
+ return false;
+ }
+ stream.hls_backlog_margin = stoi(hls_backlog_margin_it->second);
+ if (stream.hls_backlog_margin < 0 || stream.hls_backlog_margin >= stream.backlog_size) {
+ log(ERROR, "'hls_backlog_margin' must be nonnegative, but less than the backlog size");
+ return false;
+ }
+ }
+
+ // Parse the CORS origin, if it exists.
+ const auto allow_origin_it = line.parameters.find("allow_origin");
+ if (allow_origin_it != line.parameters.end()) {
+ stream.allow_origin = allow_origin_it->second;
+ }
+
config->streams.push_back(stream);
return true;
}
struct StreamConfig {
std::string url; // As seen by the client.
+ std::string hls_url; // As seen by the client. Can be empty.
std::string src; // Can be empty.
size_t backlog_size;
size_t prebuffering_bytes;
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
Encoding encoding;
Encoding src_encoding;
+ std::string allow_origin;
+
+ // These only matter if hls_url is nonempty.
+ int hls_frag_duration = 6; // Apple recommendation (“HLS Authoring Specification for Apple Devices”, point 7.5).
+ int hls_backlog_margin = 0;
};
struct UDPStreamConfig {
# are not like this. A typical example, however, is MPEG-TS.
stream /test.ts src=http://gruessi.zrh.sesse.net:4013/test.ts src_encoding=raw
+# If your input has PTS Metacube2 blocks (currently only generated by
+# Nageru >= 1.7.2 with MP4 output) and is segmentable (in practice MP4 with the
+# right tags, again typically generated by Nageru), you can serve HLS fragments
+# out of Cubemap's regular backlog, with the playlist served at the given URL
+# (in this case, /stream.m3u8). This allows you to serve video directly to
+# Mobile Safari (you'll need iOS >= 10 for fMP4 support; older iOS only
+# supports TS), and also allow rewinding in the stream if your backlog is large
+# enough. As of April 2018, iOS and hls.js seem to work well, while at least
+# VLC and mpv appear to be buggy.
+#
+# hls_frag_duration= sets the maximum fragment size in seconds; the default, 6,
+# is Apple's default recommendation. Larger fragments will cause more latency but
+# fewer HTTP requests (less chance of performance problems). (Typically, you'll want
+# a bit longer backlog than the default of 10 MB, as you won't fit many six-second
+# fragments into that.) Setting hls_backlog_margin= makes Cubemap not expose any
+# new fragments that are too far, measured in bytes, from the beginning of the
+# backlog, in order to reduce the risk of not managing to deliver them before
+# they rotate out. The default is zero, but you almost certainly want to change that
+# to be some reasonable fraction of your fragment length.
+stream /stream.mp4 src=http://gruessi.zrh.sesse.net:9095/test.mp4.metacube hls_playlist=/stream.m3u8 hls_frag_duration=6 backlog_size=20971520 hls_backlog_margin=1048576 allow_origin=*
+
# UDP input. TS is the most common container to use over UDP (you cannot
# take any arbitrary container and expect it to work).
# backlog_size=<number of bytes> overrides the backlog, which is normally 10 MB.
#include <assert.h>
#include <errno.h>
+#include <math.h>
#include <netdb.h>
#include <netinet/in.h>
#include <poll.h>
if (encoding == Input::INPUT_ENCODING_RAW) {
for (int stream_index : stream_indices) {
- servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0);
+ servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0, /*pts=*/RationalPTS());
}
return;
}
}
}
for (int stream_index : stream_indices) {
- servers->add_data(stream_index, inner_data, size, flags);
+ servers->add_data(stream_index, inner_data, size, flags, next_block_pts);
}
+ next_block_pts = RationalPTS();
}
// Consume the block. This isn't the most efficient way of dealing with things
}
uint64_t type = be64toh(*(const uint64_t *)payload);
- if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
- // Unknown metadata block, ignore.
- return;
- }
-
- timespec now;
- clock_gettime(CLOCK_REALTIME, &now);
+ if (type == METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
- const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
- if (payload_size != sizeof(*pkt)) {
- log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
- url.c_str(), payload_size);
+ double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+ 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+ {
+ lock_guard<mutex> lock(stats_mutex);
+ stats.latency_sec = elapsed;
+ }
+ } else if (type == METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS) {
+ const metacube2_pts_packet *pkt = (const metacube2_pts_packet *)payload;
+ if (payload_size != sizeof(*pkt)) {
+ log(WARNING, "[%s] Metacube pts block of wrong size (%d bytes); ignoring.",
+ url.c_str(), payload_size);
+ return;
+ }
+ next_block_pts.pts = be64toh(pkt->pts);
+ next_block_pts.timebase_num = be64toh(pkt->timebase_num);
+ next_block_pts.timebase_den = be64toh(pkt->timebase_den);
+ } else {
+ // Unknown metadata block, ignore
+ log(INFO, "[%s] Metadata block %llu\n", url.c_str(), type);
return;
}
-
- double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
- 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
- {
- lock_guard<mutex> lock(stats_mutex);
- stats.latency_sec = elapsed;
- }
}
#include "input.h"
#include "metacube2.h"
+#include "stream.h"
class InputProto;
// Last time we made a connection with logging enabled.
// (Initially at some point before the epoch.)
timespec last_verbose_connection { -3600, 0 };
+
+ // If we've received a Metacube2 PTS metadata block, it belongs to the
+ // next regular block we receive, and is stored here in the meantime.
+ // If we haven't received one yet (or we've already received the
+ // corresponding data block), this is empty, ie., timebase_num == 0.
+ RationalPTS next_block_pts;
};
#endif // !defined(_HTTPINPUT_H)
if (deserialized_urls.count(stream_config.url) == 0) {
stream_index = servers->add_stream(stream_config.url,
+ stream_config.hls_url,
stream_config.backlog_size,
stream_config.prebuffering_bytes,
Stream::Encoding(stream_config.encoding),
- Stream::Encoding(stream_config.src_encoding));
+ Stream::Encoding(stream_config.src_encoding),
+ stream_config.hls_frag_duration,
+ stream_config.hls_backlog_margin,
+ stream_config.allow_origin);
} else {
stream_index = servers->lookup_stream_by_url(stream_config.url);
assert(stream_index != -1);
servers->set_backlog_size(stream_index, stream_config.backlog_size);
+ if (!stream_config.hls_url.empty()) {
+ servers->register_hls_url(stream_index, stream_config.hls_url);
+ }
servers->set_prebuffering_bytes(stream_index, stream_config.prebuffering_bytes);
servers->set_encoding(stream_index,
Stream::Encoding(stream_config.encoding));
servers->set_src_encoding(stream_index,
Stream::Encoding(stream_config.src_encoding));
+ servers->set_hls_frag_duration(stream_index, stream_config.hls_frag_duration);
+ servers->set_hls_backlog_margin(stream_index, stream_config.hls_backlog_margin);
+ servers->set_allow_origin(stream_index, stream_config.allow_origin);
}
servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
return stream_url_it->second;
}
-int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
+int Server::add_stream(const string &url,
+ const string &hls_url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const string &allow_origin)
{
lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(url, streams.size()));
- streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
+ if (!hls_url.empty()) {
+ stream_hls_url_map.insert(make_pair(hls_url, streams.size()));
+ }
+ streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin));
return streams.size() - 1;
}
{
lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(stream.url(), streams.size()));
+ // stream_hls_url_map will be updated in register_hls_url(), since it is not part
+ // of the serialized state (it will always be picked out from the configuration).
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->src_encoding = encoding;
}
+
+void Server::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->hls_frag_duration = hls_frag_duration;
+}
+
+void Server::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ assert(hls_backlog_margin >= 0);
+ assert(hls_backlog_margin < streams[stream_index]->backlog_size);
+ streams[stream_index]->hls_backlog_margin = hls_backlog_margin;
+}
+
+void Server::set_allow_origin(int stream_index, const string &allow_origin)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->allow_origin = allow_origin;
+}
+
+void Server::register_hls_url(int stream_index, const string &hls_url)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ assert(!hls_url.empty());
+ stream_hls_url_map.insert(make_pair(hls_url, stream_index));
+}
void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->http_header = http_header;
+ Stream *stream = streams[stream_index].get();
+ stream->http_header = http_header;
- if (stream_header != streams[stream_index]->stream_header) {
+ if (stream_header != stream->stream_header) {
// We cannot start at any of the older starting points anymore,
// since they'd get the wrong header for the stream (not to mention
// that a changed header probably means the stream restarted,
// stop playing properly at the change point). Next block
// should be a suitable starting point (if not, something is
// pretty strange), so it will fill up again soon enough.
- streams[stream_index]->suitable_starting_points.clear();
+ stream->suitable_starting_points.clear();
+
+ if (!stream->fragments.empty()) {
+ stream->fragments.clear();
+ ++stream->discontinuity_counter;
+ stream->clear_hls_playlist_cache();
+ }
}
- streams[stream_index]->stream_header = stream_header;
+ stream->stream_header = stream_header;
}
void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
tls_server_contexts.insert(make_pair(acceptor, server_context));
}
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->add_data_deferred(data, bytes, metacube_flags);
+ streams[stream_index]->add_data_deferred(data, bytes, metacube_flags, pts);
}
// See the .h file for postconditions after this function.
int error_code = parse_request(client);
if (error_code == 200) {
- construct_header(client);
+ if (client->serving_hls_playlist) {
+ construct_hls_playlist(client);
+ } else {
+ construct_stream_header(client);
+ }
} else if (error_code == 204) {
construct_204(client);
} else {
}
map<string, int>::const_iterator stream_url_map_it = stream_url_map.find(url);
- if (stream_url_map_it == stream_url_map.end()) {
- map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
- if (ping_url_map_it == ping_url_map.end()) {
- return 404; // Not found.
+ if (stream_url_map_it != stream_url_map.end()) {
+ // Serve a regular stream..
+ client->stream = streams[stream_url_map_it->second].get();
+ client->serving_hls_playlist = false;
+ } else {
+ map<string, int>::const_iterator stream_hls_url_map_it = stream_hls_url_map.find(url);
+ if (stream_hls_url_map_it != stream_hls_url_map.end()) {
+ // Serve HLS playlist.
+ client->stream = streams[stream_hls_url_map_it->second].get();
+ client->serving_hls_playlist = true;
} else {
- return 204; // No error.
+ map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
+ if (ping_url_map_it == ping_url_map.end()) {
+ return 404; // Not found.
+ } else {
+ // Serve a ping (204 no error).
+ return 204;
+ }
}
}
- Stream *stream = streams[stream_url_map_it->second].get();
+ Stream *stream = client->stream;
if (stream->http_header.empty()) {
return 503; // Service unavailable.
}
+ if (client->serving_hls_playlist) {
+ if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+ // This doesn't make any sense, and is hard to implement, too.
+ return 404;
+ } else {
+ return 200;
+ }
+ }
+
if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
// This stream won't end, so we don't have a content-length,
// and can just as well tell the client it's Connection: close
return 200; // OK!
}
-void Server::construct_header(Client *client)
+void Server::construct_stream_header(Client *client)
{
Stream *stream = client->stream;
string response = stream->http_header;
} else {
assert(client->close_after_response);
}
+ if (!stream->allow_origin.empty()) {
+ response.append("Access-Control-Allow-Origin: ");
+ response.append(stream->allow_origin);
+ response.append("\r\n");
+ }
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
response.append("\r\n");
} else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
}
+void Server::construct_hls_playlist(Client *client)
+{
+ Stream *stream = client->stream;
+ shared_ptr<const string> *cache;
+ if (client->http_11) {
+ if (client->close_after_response) {
+ cache = &stream->hls_playlist_http11_close;
+ } else {
+ cache = &stream->hls_playlist_http11_persistent;
+ }
+ } else {
+ assert(client->close_after_response);
+ cache = &stream->hls_playlist_http10;
+ }
+
+ if (*cache == nullptr) {
+ *cache = stream->generate_hls_playlist(client->http_11, client->close_after_response);
+ }
+ client->header_or_short_response_ref = *cache;
+ client->header_or_short_response = cache->get();
+
+ // Switch states.
+ client->state = Client::SENDING_SHORT_RESPONSE;
+ change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
+}
+
void Server::construct_204(Client *client)
{
map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
return false;
}
+ // Log to access_log.
+ access_log->write(client->get_stats());
+
// Switch states and reset the parsers. We don't reset statistics.
client->state = Client::READING_REQUEST;
client->url.clear();
// and the order between them are undefined.
// XXX: header should ideally be ordered with respect to data.
void add_client_deferred(int sock, Acceptor *acceptor);
- void add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);
+ void add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts);
// These should not be called while running, since that would violate
// threading assumptions (ie., that epoll is only called from one thread
// at the same time).
CubemapStateProto serialize(std::unordered_map<const std::string *, size_t> *short_response_pool);
void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
- int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
+ int add_stream(const std::string &url,
+ const std::string &hls_url,
+ size_t bytes_received,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const std::string &allow_origin);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
int lookup_stream_by_url(const std::string &url) const;
void set_backlog_size(int stream_index, size_t new_size);
void set_prebuffering_bytes(int stream_index, size_t new_amount);
void set_encoding(int stream_index, Stream::Encoding encoding);
void set_src_encoding(int stream_index, Stream::Encoding encoding);
+ void set_hls_frag_duration(int stream_index, unsigned hls_frag_duration);
+ void set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin);
+ void set_allow_origin(int stream_index, const std::string &allow_origin);
+ void register_hls_url(int stream_index, const std::string &hls_url);
void add_gen204(const std::string &url, const std::string &allow_origin);
void create_tls_context_for_acceptor(const Acceptor *acceptor);
std::vector<std::unique_ptr<Stream>> streams;
// Map from URL to index into <streams>.
- std::map<std::string, int> stream_url_map;
+ std::map<std::string, int> stream_url_map, stream_hls_url_map;
// Map from URL to CORS Allow-Origin header (or empty string).
std::map<std::string, std::string> ping_url_map;
// Parse the HTTP request. Returns a HTTP status code (200/204/400/404).
int parse_request(Client *client);
- // Construct the HTTP header, and set the client into
+ // Construct the HTTP header for a regular stream, and set the client into
// the SENDING_HEADER state.
- void construct_header(Client *client);
+ void construct_stream_header(Client *client);
+
+ // Construct a HLS playlist (or get it from the cache), and set the client into
+ // the SENDING_HEADER state.
+ void construct_hls_playlist(Client *client);
// Construct a generic error with the given line, and set the client into
// the SENDING_SHORT_RESPONSE state.
return servers[0].lookup_stream_by_url(url);
}
-int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
+int ServerPool::add_stream(const string &url,
+ const string &hls_url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const string &allow_origin)
{
// Adding more HTTP streams after UDP streams would cause the UDP stream
// indices to move around, which is obviously not good.
assert(udp_streams.empty());
for (int i = 0; i < num_servers; ++i) {
- int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding);
+ int stream_index = servers[i].add_stream(url, hls_url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin);
assert(stream_index == num_http_streams);
}
return num_http_streams++;
}
}
-void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
{
assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
// HTTP stream.
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_data_deferred(stream_index, data, bytes, metacube_flags);
+ servers[i].add_data_deferred(stream_index, data, bytes, metacube_flags, pts);
}
}
servers[i].set_src_encoding(stream_index, encoding);
}
}
+
+void ServerPool::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_hls_frag_duration(stream_index, hls_frag_duration);
+ }
+}
+
+void ServerPool::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_hls_backlog_margin(stream_index, hls_backlog_margin);
+ }
+}
+
+void ServerPool::set_allow_origin(int stream_index, const std::string &allow_origin)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_allow_origin(stream_index, allow_origin);
+ }
+}
+
+void ServerPool::register_hls_url(int stream_index, const string &hls_url)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].register_hls_url(stream_index, hls_url);
+ }
+}
void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
// Adds the given stream to all the servers. Returns the stream index.
- int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
+ int add_stream(const std::string &url,
+ const std::string &hls_url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const std::string &allow_origin);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
void set_header(int stream_index,
const std::string &http_header,
const std::string &stream_header);
- void add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);
+ void add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts);
// Sets the max pacing rate for all the servers.
void set_pacing_rate(int stream_index, uint32_t pacing_rate);
// Changes the given stream's input encoding type on all the servers.
void set_src_encoding(int stream_index, Stream::Encoding encoding);
+ // Changes the given stream's maximum HLS fragment duration (in seconds) on all the servers.
+ void set_hls_frag_duration(int stream_index, unsigned hls_frag_duration);
+
+ // Changes the given stream's backlog margin for HLS fragments (in bytes) on all the servers.
+ void set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin);
+
+ // Changes the given stream's CORS header on all the servers.
+ void set_allow_origin(int stream_index, const std::string &allow_origin);
+
+ // Register the given stream under the given URL on all the servers.
+ // Used only for deserialized streams (for new ones, we do this registration
+ // in add_stream()).
+ void register_hls_url(int stream_index, const std::string &hls_url);
+
// Adds the given gen204 endpoint to all the servers.
void add_gen204(const std::string &url, const std::string &allow_origin);
optional bool in_ktls_mode = 19;
};
+// Corresponds to struct Stream::FragmentStart.
+message FragmentStartProto {
+ optional int64 byte_position = 1;
+ optional double pts = 2;
+};
+
// Corresponds to struct Stream.
message StreamProto {
optional bytes http_header = 6;
// Tag 10 is unused from 1.4.0 up (it used to be prebuffering_bytes).
optional int64 bytes_received = 3;
repeated int64 suitable_starting_point = 9;
+ repeated FragmentStartProto fragment = 11;
+ optional int64 first_fragment_index = 12;
+ optional int64 discontinuity_counter = 13;
optional string url = 4;
};
#include <assert.h>
#include <errno.h>
#include <limits.h>
+#include <math.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
using namespace std;
-Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding)
+Stream::Stream(const string &url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Encoding encoding,
+ Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const std::string &allow_origin)
: url(url),
encoding(encoding),
src_encoding(src_encoding),
+ allow_origin(allow_origin),
data_fd(make_tempfile("")),
- backlog_size(backlog_size),
- prebuffering_bytes(prebuffering_bytes)
+ backlog_size(backlog_size),
+ prebuffering_bytes(prebuffering_bytes),
+ hls_frag_duration(hls_frag_duration),
+ hls_backlog_margin(hls_backlog_margin)
{
if (data_fd == -1) {
exit(1);
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
- bytes_received(serialized.bytes_received())
+ bytes_received(serialized.bytes_received()),
+ first_fragment_index(serialized.first_fragment_index()),
+ discontinuity_counter(serialized.discontinuity_counter())
{
if (data_fd == -1) {
exit(1);
}
suitable_starting_points.push_back(point);
}
+
+ for (const FragmentStartProto &fragment : serialized.fragment()) {
+ fragments.push_back(FragmentStart { size_t(fragment.byte_position()), fragment.pts() });
+ }
}
StreamProto Stream::serialize()
for (size_t point : suitable_starting_points) {
serialized.add_suitable_starting_point(point);
}
+ for (const FragmentStart &fragment : fragments) {
+ FragmentStartProto *proto = serialized.add_fragment();
+ proto->set_byte_position(fragment.byte_position);
+ proto->set_pts(fragment.pts);
+ }
+ serialized.set_first_fragment_index(first_fragment_index);
+ serialized.set_discontinuity_counter(discontinuity_counter);
+
serialized.set_url(url);
data_fd = -1;
return serialized;
data_element.data.iov_base = reinterpret_cast<char *>(data[i].data.iov_base) + bytes_wanted;
data_element.data.iov_len = data[i].data.iov_len - bytes_wanted;
data_element.metacube_flags = METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
+ data_element.pts = RationalPTS();
ret.push_back(data_element);
bytes_wanted = 0;
}
bytes_received - suitable_starting_points[0] > backlog_size) {
suitable_starting_points.pop_front();
}
+ assert(backlog_size >= hls_backlog_margin);
+ while (!fragments.empty() &&
+ bytes_received - fragments[0].byte_position > (backlog_size - hls_backlog_margin)) {
+ fragments.pop_front();
+ ++first_fragment_index;
+ clear_hls_playlist_cache();
+ }
}
-void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags)
+void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
{
// For regular output, we don't want to send the client twice
// (it's already sent out together with the HTTP header).
DataElement data_element;
data_element.metacube_flags = metacube_flags;
+ data_element.pts = pts;
if (encoding == Stream::STREAM_ENCODING_METACUBE) {
+ // Construct a PTS metadata block. (We'll avoid sending it out
+ // if we don't have a valid PTS.)
+ metacube2_pts_packet pts_packet;
+ pts_packet.type = htobe64(METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS);
+ pts_packet.pts = htobe64(pts.pts);
+ pts_packet.timebase_num = htobe64(pts.timebase_num);
+ pts_packet.timebase_den = htobe64(pts.timebase_den);
+
+ metacube2_block_header pts_hdr;
+ memcpy(pts_hdr.sync, METACUBE2_SYNC, sizeof(pts_hdr.sync));
+ pts_hdr.size = htonl(sizeof(pts_packet));
+ pts_hdr.flags = htons(METACUBE_FLAGS_METADATA);
+ pts_hdr.csum = htons(metacube2_compute_crc(&pts_hdr));
+
// Add a Metacube block header before the data.
metacube2_block_header hdr;
memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
hdr.flags = htons(metacube_flags);
hdr.csum = htons(metacube2_compute_crc(&hdr));
- data_element.data.iov_base = new char[bytes + sizeof(hdr)];
data_element.data.iov_len = bytes + sizeof(hdr);
+ if (pts.timebase_num != 0) {
+ data_element.data.iov_len += sizeof(pts_hdr) + sizeof(pts_packet);
+ }
+ data_element.data.iov_base = new char[data_element.data.iov_len];
+
+ char *ptr = reinterpret_cast<char *>(data_element.data.iov_base);
+ if (pts.timebase_num != 0) {
+ memcpy(ptr, &pts_hdr, sizeof(pts_hdr));
+ ptr += sizeof(pts_hdr);
+ memcpy(ptr, &pts_packet, sizeof(pts_packet));
+ ptr += sizeof(pts_packet);
+ }
- memcpy(data_element.data.iov_base, &hdr, sizeof(hdr));
- memcpy(reinterpret_cast<char *>(data_element.data.iov_base) + sizeof(hdr), data, bytes);
+ memcpy(ptr, &hdr, sizeof(hdr));
+ ptr += sizeof(hdr);
+ memcpy(ptr, data, bytes);
queued_data.push_back(data_element);
} else if (encoding == Stream::STREAM_ENCODING_RAW) {
// data, and 10 kB is a very fine granularity in most streams.
static const int minimum_start_point_distance = 10240;
size_t byte_position = bytes_received;
+ bool need_hls_clear = false;
for (const DataElement &elem : queued_data_copy) {
if ((elem.metacube_flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0) {
size_t num_points = suitable_starting_points.size();
suitable_starting_points.pop_back();
}
suitable_starting_points.push_back(byte_position);
+
+ if (elem.pts.timebase_num != 0) {
+ need_hls_clear |= add_fragment_boundary(byte_position, elem.pts);
+ }
}
byte_position += elem.data.iov_len;
}
+ if (need_hls_clear) {
+ clear_hls_playlist_cache();
+ }
add_data_raw(queued_data_copy);
remove_obsolete_starting_points();
sleeping_clients.clear();
}
}
+
+bool Stream::add_fragment_boundary(size_t byte_position, const RationalPTS &pts)
+{
+ double pts_double = double(pts.pts) * pts.timebase_den / pts.timebase_num;
+
+ if (fragments.size() <= 1) {
+ // Just starting up, so try to establish the first in-progress fragment.
+ fragments.push_back(FragmentStart{ byte_position, pts_double });
+ return false;
+ }
+
+ // Keep extending the in-progress fragment as long as we do not
+ // exceed the target duration by more than half a second
+ // (RFC 8216 4.3.3.1) and we get closer to the target by doing so.
+ // Note that in particular, this means we'll always extend
+ // as long as we don't exceed the target duration.
+ double current_duration = fragments[fragments.size() - 1].pts;
+ double candidate_duration = pts_double - fragments[fragments.size() - 2].pts;
+ if (lrintf(candidate_duration) <= hls_frag_duration &&
+ fabs(candidate_duration - hls_frag_duration) < fabs(current_duration - hls_frag_duration)) {
+ fragments.back() = FragmentStart{ byte_position, pts_double };
+ return false;
+ } else {
+ // Extending the in-progress fragment would make it too long,
+ // so finalize it and start a new in-progress fragment.
+ fragments.push_back(FragmentStart{ byte_position, pts_double });
+ return true;
+ }
+}
+
+void Stream::clear_hls_playlist_cache()
+{
+ hls_playlist_http10.reset();
+ hls_playlist_http11_close.reset();
+ hls_playlist_http11_persistent.reset();
+}
+
+shared_ptr<const string> Stream::generate_hls_playlist(bool http_11, bool close_after_response)
+{
+ char buf[256];
+ snprintf(buf, sizeof(buf),
+ "#EXTM3U\r\n"
+ "#EXT-X-VERSION:7\r\n"
+ "#EXT-X-TARGETDURATION:%u\r\n"
+ "#EXT-X-MEDIA-SEQUENCE:%zu\r\n"
+ "#EXT-X-DISCONTINUITY-SEQUENCE:%zu\r\n",
+ hls_frag_duration,
+ first_fragment_index,
+ discontinuity_counter);
+
+ string playlist = buf;
+
+ if (!stream_header.empty()) {
+ snprintf(buf, sizeof(buf), "#EXT-X-MAP:URI=\"%s?frag=header\"\r\n", url.c_str());
+ playlist += buf;
+ }
+
+ playlist += "\r\n";
+ if (fragments.size() >= 3) {
+ for (size_t i = 0; i < fragments.size() - 2; ++i) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "#EXTINF:%f,\r\n%s?frag=%zu-%zu\r\n",
+ fragments[i + 1].pts - fragments[i].pts,
+ url.c_str(),
+ fragments[i].byte_position,
+ fragments[i + 1].byte_position);
+ playlist += buf;
+ }
+ }
+
+ string response;
+ if (http_11) {
+ response = "HTTP/1.1 200 OK\r\n";
+ if (close_after_response) {
+ response.append("Connection: close\r\n");
+ }
+ } else {
+ assert(close_after_response);
+ response = "HTTP/1.0 200 OK\r\n";
+ }
+ snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", playlist.size());
+ response.append(buf);
+ response.append("Content-type: application/x-mpegURL\r\n");
+ if (!allow_origin.empty()) {
+ response.append("Access-Control-Allow-Origin: ");
+ response.append(allow_origin);
+ response.append("\r\n");
+ }
+ response.append("\r\n");
+ response.append(move(playlist));
+
+ return shared_ptr<const string>(new string(move(response)));
+}
#include <sys/types.h>
#include <sys/uio.h>
#include <deque>
+#include <memory>
#include <mutex>
#include <string>
#include <vector>
+#include "metacube2.h"
+
class StreamProto;
struct Client;
+// metacube2_pts_packet except the type and byteswapping.
+struct RationalPTS {
+ int64_t pts = 0;
+ uint64_t timebase_num = 0, timebase_den = 0; // 0/0 for unknown PTS.
+};
+
struct Stream {
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
- Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding);
+ Stream(const std::string &url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Encoding encoding,
+ Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const std::string &allow_origin);
~Stream();
// Serialization/deserialization.
// What encoding we expect the incoming data to be in (usually Metacube).
Encoding src_encoding;
+ // Contents of CORS header (Access-Control-Allow-Origin), if any.
+ std::string allow_origin;
+
// The stream data itself, stored in a circular buffer.
//
// We store our data in a file, so that we can send the data to the
// A list of points in the stream that is suitable to start new clients at
// (after having sent the header). Empty if no such point exists yet.
std::deque<size_t> suitable_starting_points;
-
+
+ // A list of HLS fragment boundaries currently in the backlog; the first fragment
+ // is between point 0 and 1, the second is between 1 and 2, and so on.
+ // This roughly mirrors suitable_starting_points, but we generally make much
+ // larger fragments (we try to get as close as possible without exceeding
+ // <hls_frag_duration> seconds by too much).
+ //
+ // We keep this list even if we don't have HLS, given that we have pts data
+ // from the input stream.
+ //
+ // NOTE: The last fragment is an in-progress fragment, which can still be
+ // extended and thus should not be output. So the last fragment output is
+ // from points N-3..N-2.
+ struct FragmentStart {
+ size_t byte_position;
+ double pts;
+ };
+ std::deque<FragmentStart> fragments;
+ size_t first_fragment_index = 0, discontinuity_counter = 0;
+
+ // HLS target duration, in seconds.
+ unsigned hls_frag_duration = 6;
+
+ // Don't advertise new HLS fragments beginning before this point after the
+ // start of the backlog, so that we're reasonably sure that we can actually
+ // serve them even if the client can't completely keep up.
+ size_t hls_backlog_margin = 0;
+
+ // HLS playlists for this stream, in the form of a HTTP response, with
+ // headers and all. These are created on-demand, re-used by clients as
+ // needed, and cleared when they are no longer valid (e.g., when new fragments
+ // are added).
+ std::shared_ptr<const std::string> hls_playlist_http10;
+ std::shared_ptr<const std::string> hls_playlist_http11_close;
+ std::shared_ptr<const std::string> hls_playlist_http11_persistent;
+
// Clients that are in SENDING_DATA, but that we don't listen on,
// because we currently don't have any data for them.
// See put_client_to_sleep() and wake_up_all_clients().
struct DataElement {
iovec data;
uint16_t metacube_flags;
+ RationalPTS pts;
};
std::vector<DataElement> queued_data;
// Add more data to <queued_data>, adding Metacube headers if needed.
// Does not take ownership of <data>.
- void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags);
+ void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts);
// Add queued data to the stream, if any.
// You should hold the owning Server's <mutex>.
void process_queued_data();
+ // Generate a HLS playlist based on the current state, including HTTP headers.
+ std::shared_ptr<const std::string> generate_hls_playlist(bool http_11, bool close_after_response);
+
+ void clear_hls_playlist_cache();
+
private:
Stream(const Stream& other);
// in the backlog.
// You should hold the owning Server's <mutex>.
void remove_obsolete_starting_points();
+
+ // Extend the in-progress fragment to the given position, or finish it and start
+ // a new one if that would make it too long. Returns true if a new fragment
+ // was created (ie., the HLS playlists need to be regenerated).
+ bool add_fragment_boundary(size_t byte_position, const RationalPTS &pts);
};
#endif // !defined(_STREAM_H)
}
for (size_t stream_index : stream_indices) {
- servers->add_data(stream_index, packet_buf, ret, /*metacube_flags=*/0);
+ servers->add_data(stream_index, packet_buf, ret, /*metacube_flags=*/0, /*pts=*/RationalPTS());
}
}
}