Add support for serving HLS playlists.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 6 Apr 2018 20:57:14 +0000 (22:57 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 6 Apr 2018 20:57:14 +0000 (22:57 +0200)
This depends on the new Metacube PTS metadata block, which only
Nageru >= 1.7.2 serves at the moment. Lightly tested with iOS and hls.js;
does not work with VLC and mpv yet.

15 files changed:
client.h
config.cpp
config.h
cubemap.config.sample
httpinput.cpp
httpinput.h
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stream.cpp
stream.h
udpinput.cpp

index 8f9f682..f2bd7fb 100644 (file)
--- a/client.h
+++ b/client.h
@@ -62,6 +62,9 @@ struct Client {
        std::string url;
        Stream *stream = nullptr;
 
+       // If true, we don't actually serve the stream, but its HLS playlist.
+       bool serving_hls_playlist = false;
+
        // Whether we should close the connection after sending the response.
        // Not relevant for READING_REQUEST. Must be true if http_11 is false.
        bool close_after_response;
index 55968ee..5a1fdd9 100644 (file)
@@ -380,6 +380,54 @@ bool parse_stream(const ConfigLine &line, Config *config)
                stream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8;
        }
 
+       // Parse the HLS URL, if any.
+       const auto hls_url_it = line.parameters.find("hls_playlist");
+       if (hls_url_it != line.parameters.end()) {
+               stream.hls_url = hls_url_it->second;
+               if (stream.hls_url.empty()) {
+                       log(ERROR, "Parameter 'hls_playlist' was given but empty");
+                       return false;
+               }
+               if (stream.encoding == StreamConfig::STREAM_ENCODING_METACUBE) {
+                       log(ERROR, "HLS cannot be used with Metacube output");
+                       return false;
+               }
+       }
+
+       // Parse the HLS fragment duration, if any.
+       const auto hls_frag_duration_it = line.parameters.find("hls_frag_duration");
+       if (hls_frag_duration_it != line.parameters.end()) {
+               if (stream.hls_url.empty()) {
+                       log(ERROR, "Parameter 'hls_frag_duration' given, but no 'hls_playlist' given");
+                       return false;
+               }
+               stream.hls_frag_duration = stoi(hls_frag_duration_it->second);
+               if (stream.hls_frag_duration <= 0) {
+                       log(ERROR, "'hls_frag_duration' must be a strictly positive integer");
+                       return false;
+               }
+       }
+
+       // Parse the HLS backlog margin, if any.
+       const auto hls_backlog_margin_it = line.parameters.find("hls_backlog_margin");
+       if (hls_backlog_margin_it != line.parameters.end()) {
+               if (stream.hls_url.empty()) {
+                       log(ERROR, "Parameter 'hls_backlog_margin' given, but no 'hls_playlist' given");
+                       return false;
+               }
+               stream.hls_backlog_margin = stoi(hls_backlog_margin_it->second);
+               if (stream.hls_backlog_margin < 0 || stream.hls_backlog_margin >= stream.backlog_size) {
+                       log(ERROR, "'hls_backlog_margin' must be nonnegative, but less than the backlog size");
+                       return false;
+               }
+       }
+
+       // Parse the CORS origin, if it exists.
+       const auto allow_origin_it = line.parameters.find("allow_origin");
+       if (allow_origin_it != line.parameters.end()) {
+               stream.allow_origin = allow_origin_it->second;
+       }
+
        config->streams.push_back(stream);
        return true;
 }
index 4d78a37..11907db 100644 (file)
--- a/config.h
+++ b/config.h
@@ -11,6 +11,7 @@
 
 struct StreamConfig {
        std::string url;  // As seen by the client.
+       std::string hls_url;  // As seen by the client. Can be empty.
        std::string src;  // Can be empty.
        size_t backlog_size;
        size_t prebuffering_bytes;
@@ -18,6 +19,11 @@ struct StreamConfig {
        enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
        Encoding encoding;
        Encoding src_encoding;
+       std::string allow_origin;
+
+       // These only matter if hls_url is nonempty.
+       int hls_frag_duration = 6;  // Apple recommendation (“HLS Authoring Specification for Apple Devices”, point 7.5).
+       int hls_backlog_margin = 0;
 };
 
 struct UDPStreamConfig {
index 3e83673..e7119ff 100644 (file)
@@ -60,6 +60,27 @@ stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encodin
 # are not like this. A typical example, however, is MPEG-TS.
 stream /test.ts src=http://gruessi.zrh.sesse.net:4013/test.ts src_encoding=raw
 
+# If your input has PTS Metacube2 blocks (currently only generated by
+# Nageru >= 1.7.2 with MP4 output) and is segmentable (in practice MP4 with the
+# right tags, again typically generated by Nageru), you can serve HLS fragments
+# out of Cubemap's regular backlog, with the playlist served at the given URL
+# (in this case, /stream.m3u8). This allows you to serve video directly to
+# Mobile Safari (you'll need iOS >= 10 for fMP4 support; older iOS only
+# supports TS), and also allow rewinding in the stream if your backlog is large
+# enough. As of April 2018, iOS and hls.js seem to work well, while at least
+# VLC and mpv appear to be buggy.
+#
+# hls_frag_duration= sets the maximum fragment size in seconds; the default, 6,
+# is Apple's default recommendation. Larger fragments will cause more latency but
+# fewer HTTP requests (less chance of performance problems). (Typically, you'll want
+# a bit longer backlog than the default of 10 MB, as you won't fit many six-second
+# fragments into that.) Setting hls_backlog_margin= makes Cubemap not expose any
+# new fragments that are too far, measured in bytes, from the beginning of the
+# backlog, in order to reduce the risk of not managing to deliver them before
+# they rotate out. The default is zero, but you almost certainly want to change that
+# to be some reasonable fraction of your fragment length.
+stream /stream.mp4 src=http://gruessi.zrh.sesse.net:9095/test.mp4.metacube hls_playlist=/stream.m3u8 hls_frag_duration=6 backlog_size=20971520 hls_backlog_margin=1048576 allow_origin=*
+
 # UDP input. TS is the most common container to use over UDP (you cannot
 # take any arbitrary container and expect it to work).
 # backlog_size=<number of bytes> overrides the backlog, which is normally 10 MB.
index 4b34e85..b969596 100644 (file)
@@ -1,5 +1,6 @@
 #include <assert.h>
 #include <errno.h>
+#include <math.h>
 #include <netdb.h>
 #include <netinet/in.h>
 #include <poll.h>
@@ -568,7 +569,7 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
 
        if (encoding == Input::INPUT_ENCODING_RAW) {
                for (int stream_index : stream_indices) {
-                       servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0);
+                       servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0, /*pts=*/RationalPTS());
                }
                return;
        }
@@ -659,8 +660,9 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                                }
                        }
                        for (int stream_index : stream_indices) {
-                               servers->add_data(stream_index, inner_data, size, flags);
+                               servers->add_data(stream_index, inner_data, size, flags, next_block_pts);
                        }
+                       next_block_pts = RationalPTS();
                }
 
                // Consume the block. This isn't the most efficient way of dealing with things
@@ -703,25 +705,36 @@ void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hd
        }
 
        uint64_t type = be64toh(*(const uint64_t *)payload);
-       if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
-               // Unknown metadata block, ignore.
-               return;
-       }
-
-       timespec now;
-       clock_gettime(CLOCK_REALTIME, &now);
+       if (type == METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) {
+               timespec now;
+               clock_gettime(CLOCK_REALTIME, &now);
+
+               const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
+               if (payload_size != sizeof(*pkt)) {
+                       log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
+                               url.c_str(), payload_size);
+                       return;
+               }
 
-       const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload;
-       if (payload_size != sizeof(*pkt)) {
-               log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.",
-                       url.c_str(), payload_size);
+               double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
+                       1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
+               {
+                       lock_guard<mutex> lock(stats_mutex);
+                       stats.latency_sec = elapsed;
+               }
+       } else if (type == METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS) {
+               const metacube2_pts_packet *pkt = (const metacube2_pts_packet *)payload;
+               if (payload_size != sizeof(*pkt)) {
+                       log(WARNING, "[%s] Metacube pts block of wrong size (%d bytes); ignoring.",
+                               url.c_str(), payload_size);
+                       return;
+               }
+               next_block_pts.pts = be64toh(pkt->pts);
+               next_block_pts.timebase_num = be64toh(pkt->timebase_num);
+               next_block_pts.timebase_den = be64toh(pkt->timebase_den);
+       } else {
+               // Unknown metadata block, ignore
+               log(INFO, "[%s] Metadata block %llu\n", url.c_str(), type);
                return;
        }
-
-       double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
-               1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
-       {
-               lock_guard<mutex> lock(stats_mutex);
-               stats.latency_sec = elapsed;
-       }
 }
index 31819f9..4ee5ede 100644 (file)
@@ -8,6 +8,7 @@
 
 #include "input.h"
 #include "metacube2.h"
+#include "stream.h"
 
 class InputProto;
 
@@ -108,6 +109,12 @@ private:
        // Last time we made a connection with logging enabled.
        // (Initially at some point before the epoch.)
        timespec last_verbose_connection { -3600, 0 };
+
+       // If we've received a Metacube2 PTS metadata block, it belongs to the
+       // next regular block we receive, and is stored here in the meantime.
+       // If we haven't received one yet (or we've already received the
+       // corresponding data block), this is empty, ie., timebase_num == 0.
+       RationalPTS next_block_pts;
 };
 
 #endif  // !defined(_HTTPINPUT_H)
index 88c49f4..fc36861 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -194,19 +194,29 @@ void create_streams(const Config &config,
 
                if (deserialized_urls.count(stream_config.url) == 0) {
                        stream_index = servers->add_stream(stream_config.url,
+                                                          stream_config.hls_url,
                                                           stream_config.backlog_size,
                                                           stream_config.prebuffering_bytes,
                                                           Stream::Encoding(stream_config.encoding),
-                                                          Stream::Encoding(stream_config.src_encoding));
+                                                          Stream::Encoding(stream_config.src_encoding),
+                                                          stream_config.hls_frag_duration,
+                                                          stream_config.hls_backlog_margin,
+                                                          stream_config.allow_origin);
                } else {
                        stream_index = servers->lookup_stream_by_url(stream_config.url);
                        assert(stream_index != -1);
                        servers->set_backlog_size(stream_index, stream_config.backlog_size);
+                       if (!stream_config.hls_url.empty()) {
+                               servers->register_hls_url(stream_index, stream_config.hls_url);
+                       }
                        servers->set_prebuffering_bytes(stream_index, stream_config.prebuffering_bytes);
                        servers->set_encoding(stream_index,
                                              Stream::Encoding(stream_config.encoding));
                        servers->set_src_encoding(stream_index,
                                                  Stream::Encoding(stream_config.src_encoding));
+                       servers->set_hls_frag_duration(stream_index, stream_config.hls_frag_duration);
+                       servers->set_hls_backlog_margin(stream_index, stream_config.hls_backlog_margin);
+                       servers->set_allow_origin(stream_index, stream_config.allow_origin);
                }
 
                servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
index 6570b5b..e9bc654 100644 (file)
@@ -307,11 +307,22 @@ int Server::lookup_stream_by_url(const string &url) const
        return stream_url_it->second;
 }
 
-int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
+int Server::add_stream(const string &url,
+                       const string &hls_url,
+                       size_t backlog_size,
+                       size_t prebuffering_bytes,
+                       Stream::Encoding encoding,
+                       Stream::Encoding src_encoding,
+                       unsigned hls_frag_duration,
+                       size_t hls_backlog_margin,
+                       const string &allow_origin)
 {
        lock_guard<mutex> lock(mu);
        stream_url_map.insert(make_pair(url, streams.size()));
-       streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
+       if (!hls_url.empty()) {
+               stream_hls_url_map.insert(make_pair(hls_url, streams.size()));
+       }
+       streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin));
        return streams.size() - 1;
 }
 
@@ -319,6 +330,8 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        lock_guard<mutex> lock(mu);
        stream_url_map.insert(make_pair(stream.url(), streams.size()));
+       // stream_hls_url_map will be updated in register_hls_url(), since it is not part
+       // of the serialized state (it will always be picked out from the configuration).
        streams.emplace_back(new Stream(stream, data_fd));
        return streams.size() - 1;
 }
@@ -350,14 +363,46 @@ void Server::set_src_encoding(int stream_index, Stream::Encoding encoding)
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
        streams[stream_index]->src_encoding = encoding;
 }
+
+void Server::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
+{
+       lock_guard<mutex> lock(mu);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->hls_frag_duration = hls_frag_duration;
+}
+
+void Server::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
+{
+       lock_guard<mutex> lock(mu);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       assert(hls_backlog_margin >= 0);
+       assert(hls_backlog_margin < streams[stream_index]->backlog_size);
+       streams[stream_index]->hls_backlog_margin = hls_backlog_margin;
+}
+
+void Server::set_allow_origin(int stream_index, const string &allow_origin)
+{
+       lock_guard<mutex> lock(mu);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->allow_origin = allow_origin;
+}
+
+void Server::register_hls_url(int stream_index, const string &hls_url)
+{
+       lock_guard<mutex> lock(mu);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       assert(!hls_url.empty());
+       stream_hls_url_map.insert(make_pair(hls_url, stream_index));
+}
        
 void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
 {
        lock_guard<mutex> lock(mu);
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
-       streams[stream_index]->http_header = http_header;
+       Stream *stream = streams[stream_index].get();
+       stream->http_header = http_header;
 
-       if (stream_header != streams[stream_index]->stream_header) {
+       if (stream_header != stream->stream_header) {
                // We cannot start at any of the older starting points anymore,
                // since they'd get the wrong header for the stream (not to mention
                // that a changed header probably means the stream restarted,
@@ -365,9 +410,15 @@ void Server::set_header(int stream_index, const string &http_header, const strin
                // stop playing properly at the change point). Next block
                // should be a suitable starting point (if not, something is
                // pretty strange), so it will fill up again soon enough.
-               streams[stream_index]->suitable_starting_points.clear();
+               stream->suitable_starting_points.clear();
+
+               if (!stream->fragments.empty()) {
+                       stream->fragments.clear();
+                       ++stream->discontinuity_counter;
+                       stream->clear_hls_playlist_cache();
+               }
        }
-       streams[stream_index]->stream_header = stream_header;
+       stream->stream_header = stream_header;
 }
        
 void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
@@ -403,10 +454,10 @@ void Server::create_tls_context_for_acceptor(const Acceptor *acceptor)
        tls_server_contexts.insert(make_pair(acceptor, server_context));
 }
 
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
 {
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
-       streams[stream_index]->add_data_deferred(data, bytes, metacube_flags);
+       streams[stream_index]->add_data_deferred(data, bytes, metacube_flags, pts);
 }
 
 // See the .h file for postconditions after this function.     
@@ -478,7 +529,11 @@ read_request_again:
 
                int error_code = parse_request(client);
                if (error_code == 200) {
-                       construct_header(client);
+                       if (client->serving_hls_playlist) {
+                               construct_hls_playlist(client);
+                       } else {
+                               construct_stream_header(client);
+                       }
                } else if (error_code == 204) {
                        construct_204(client);
                } else {
@@ -936,20 +991,41 @@ int Server::parse_request(Client *client)
        }
 
        map<string, int>::const_iterator stream_url_map_it = stream_url_map.find(url);
-       if (stream_url_map_it == stream_url_map.end()) {
-               map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
-               if (ping_url_map_it == ping_url_map.end()) {
-                       return 404;  // Not found.
+       if (stream_url_map_it != stream_url_map.end()) {
+               // Serve a regular stream..
+               client->stream = streams[stream_url_map_it->second].get();
+               client->serving_hls_playlist = false;
+       } else {
+               map<string, int>::const_iterator stream_hls_url_map_it = stream_hls_url_map.find(url);
+               if (stream_hls_url_map_it != stream_hls_url_map.end()) {
+                       // Serve HLS playlist.
+                       client->stream = streams[stream_hls_url_map_it->second].get();
+                       client->serving_hls_playlist = true;
                } else {
-                       return 204;  // No error.
+                       map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
+                       if (ping_url_map_it == ping_url_map.end()) {
+                               return 404;  // Not found.
+                       } else {
+                               // Serve a ping (204 no error).
+                               return 204;
+                       }
                }
        }
 
-       Stream *stream = streams[stream_url_map_it->second].get();
+       Stream *stream = client->stream;
        if (stream->http_header.empty()) {
                return 503;  // Service unavailable.
        }
 
+       if (client->serving_hls_playlist) {
+               if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+                       // This doesn't make any sense, and is hard to implement, too.
+                       return 404;
+               } else {
+                       return 200;
+               }
+       }
+
        if (client->stream_pos_end == Client::STREAM_POS_NO_END) {
                // This stream won't end, so we don't have a content-length,
                // and can just as well tell the client it's Connection: close
@@ -982,7 +1058,7 @@ int Server::parse_request(Client *client)
        return 200;  // OK!
 }
 
-void Server::construct_header(Client *client)
+void Server::construct_stream_header(Client *client)
 {
        Stream *stream = client->stream;
        string response = stream->http_header;
@@ -1004,6 +1080,11 @@ void Server::construct_header(Client *client)
        } else {
                assert(client->close_after_response);
        }
+       if (!stream->allow_origin.empty()) {
+               response.append("Access-Control-Allow-Origin: ");
+               response.append(stream->allow_origin);
+               response.append("\r\n");
+       }
        if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
                response.append("\r\n");
        } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
@@ -1056,6 +1137,32 @@ void Server::construct_error(Client *client, int error_code)
        change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
 }
 
+void Server::construct_hls_playlist(Client *client)
+{
+       Stream *stream = client->stream;
+       shared_ptr<const string> *cache;
+       if (client->http_11) {
+               if (client->close_after_response) {
+                       cache = &stream->hls_playlist_http11_close;
+               } else {
+                       cache = &stream->hls_playlist_http11_persistent;
+               }
+       } else {
+               assert(client->close_after_response);
+               cache = &stream->hls_playlist_http10;
+       }
+
+       if (*cache == nullptr) {
+               *cache = stream->generate_hls_playlist(client->http_11, client->close_after_response);
+       }
+       client->header_or_short_response_ref = *cache;
+       client->header_or_short_response = cache->get();
+
+       // Switch states.
+       client->state = Client::SENDING_SHORT_RESPONSE;
+       change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
+}
+
 void Server::construct_204(Client *client)
 {
        map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
@@ -1137,6 +1244,9 @@ bool Server::more_requests(Client *client)
                return false;
        }
 
+       // Log to access_log.
+       access_log->write(client->get_stats());
+
        // Switch states and reset the parsers. We don't reset statistics.
        client->state = Client::READING_REQUEST;
        client->url.clear();
index 2fe4f15..da20764 100644 (file)
--- a/server.h
+++ b/server.h
@@ -52,20 +52,32 @@ public:
        // and the order between them are undefined.
        // XXX: header should ideally be ordered with respect to data.
        void add_client_deferred(int sock, Acceptor *acceptor);
-       void add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);
+       void add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts);
 
        // These should not be called while running, since that would violate
        // threading assumptions (ie., that epoll is only called from one thread
        // at the same time).
        CubemapStateProto serialize(std::unordered_map<const std::string *, size_t> *short_response_pool);
        void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
-       int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
+       int add_stream(const std::string &url,
+                      const std::string &hls_url,
+                      size_t bytes_received,
+                      size_t prebuffering_bytes,
+                      Stream::Encoding encoding,
+                      Stream::Encoding src_encoding,
+                      unsigned hls_frag_duration,
+                      size_t hls_backlog_margin,
+                      const std::string &allow_origin);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
        int lookup_stream_by_url(const std::string &url) const;
        void set_backlog_size(int stream_index, size_t new_size);
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
        void set_encoding(int stream_index, Stream::Encoding encoding);
        void set_src_encoding(int stream_index, Stream::Encoding encoding);
+       void set_hls_frag_duration(int stream_index, unsigned hls_frag_duration);
+       void set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin);
+       void set_allow_origin(int stream_index, const std::string &allow_origin);
+       void register_hls_url(int stream_index, const std::string &hls_url);
        void add_gen204(const std::string &url, const std::string &allow_origin);
        void create_tls_context_for_acceptor(const Acceptor *acceptor);
 
@@ -94,7 +106,7 @@ private:
        std::vector<std::unique_ptr<Stream>> streams;
 
        // Map from URL to index into <streams>.
-       std::map<std::string, int> stream_url_map;
+       std::map<std::string, int> stream_url_map, stream_hls_url_map;
 
        // Map from URL to CORS Allow-Origin header (or empty string).
        std::map<std::string, std::string> ping_url_map;
@@ -170,9 +182,13 @@ private:
        // Parse the HTTP request. Returns a HTTP status code (200/204/400/404).
        int parse_request(Client *client);
 
-       // Construct the HTTP header, and set the client into
+       // Construct the HTTP header for a regular stream, and set the client into
        // the SENDING_HEADER state.
-       void construct_header(Client *client);
+       void construct_stream_header(Client *client);
+
+       // Construct a HLS playlist (or get it from the cache), and set the client into
+       // the SENDING_HEADER state.
+       void construct_hls_playlist(Client *client);
 
        // Construct a generic error with the given line, and set the client into
        // the SENDING_SHORT_RESPONSE state.
index 8e233da..9f4f3a4 100644 (file)
@@ -71,14 +71,22 @@ int ServerPool::lookup_stream_by_url(const string &url) const
        return servers[0].lookup_stream_by_url(url);
 }
 
-int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
+int ServerPool::add_stream(const string &url,
+                           const string &hls_url,
+                           size_t backlog_size,
+                           size_t prebuffering_bytes,
+                           Stream::Encoding encoding,
+                           Stream::Encoding src_encoding,
+                           unsigned hls_frag_duration,
+                           size_t hls_backlog_margin,
+                           const string &allow_origin)
 {
        // Adding more HTTP streams after UDP streams would cause the UDP stream
        // indices to move around, which is obviously not good.
        assert(udp_streams.empty());
 
        for (int i = 0; i < num_servers; ++i) {
-               int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding);
+               int stream_index = servers[i].add_stream(url, hls_url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin);
                assert(stream_index == num_http_streams);
        }
        return num_http_streams++;
@@ -145,7 +153,7 @@ void ServerPool::set_header(int stream_index, const string &http_header, const s
        }
 }
 
-void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
 {
        assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
 
@@ -157,7 +165,7 @@ void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint
 
        // HTTP stream.
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_data_deferred(stream_index, data, bytes, metacube_flags);
+               servers[i].add_data_deferred(stream_index, data, bytes, metacube_flags, pts);
        }
 }
 
@@ -233,3 +241,31 @@ void ServerPool::set_src_encoding(int stream_index, Stream::Encoding encoding)
                servers[i].set_src_encoding(stream_index, encoding);
        }
 }
+
+void ServerPool::set_hls_frag_duration(int stream_index, unsigned hls_frag_duration)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_hls_frag_duration(stream_index, hls_frag_duration);
+       }
+}
+
+void ServerPool::set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_hls_backlog_margin(stream_index, hls_backlog_margin);
+       }
+}
+
+void ServerPool::set_allow_origin(int stream_index, const std::string &allow_origin)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_allow_origin(stream_index, allow_origin);
+       }
+}
+
+void ServerPool::register_hls_url(int stream_index, const string &hls_url)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].register_hls_url(stream_index, hls_url);
+       }
+}
index 6167628..a70d542 100644 (file)
@@ -30,7 +30,15 @@ public:
        void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
 
        // Adds the given stream to all the servers. Returns the stream index.
-       int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
+       int add_stream(const std::string &url,
+                      const std::string &hls_url,
+                      size_t backlog_size,
+                      size_t prebuffering_bytes,
+                      Stream::Encoding encoding,
+                      Stream::Encoding src_encoding,
+                       unsigned hls_frag_duration,
+                       size_t hls_backlog_margin,
+                       const std::string &allow_origin);
        int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
        void delete_stream(const std::string &url);
        int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
@@ -42,7 +50,7 @@ public:
        void set_header(int stream_index,
                        const std::string &http_header,
                        const std::string &stream_header);
-       void add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);
+       void add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts);
 
        // Sets the max pacing rate for all the servers.
        void set_pacing_rate(int stream_index, uint32_t pacing_rate);
@@ -59,6 +67,20 @@ public:
        // Changes the given stream's input encoding type on all the servers.
        void set_src_encoding(int stream_index, Stream::Encoding encoding);
 
+       // Changes the given stream's maximum HLS fragment duration (in seconds) on all the servers.
+       void set_hls_frag_duration(int stream_index, unsigned hls_frag_duration);
+
+       // Changes the given stream's backlog margin for HLS fragments (in bytes) on all the servers.
+       void set_hls_backlog_margin(int stream_index, size_t hls_backlog_margin);
+
+       // Changes the given stream's CORS header on all the servers.
+       void set_allow_origin(int stream_index, const std::string &allow_origin);
+
+       // Register the given stream under the given URL on all the servers.
+       // Used only for deserialized streams (for new ones, we do this registration
+       // in add_stream()).
+       void register_hls_url(int stream_index, const std::string &hls_url);
+
        // Adds the given gen204 endpoint to all the servers.
        void add_gen204(const std::string &url, const std::string &allow_origin);
 
index e9a4f13..edb6f77 100644 (file)
@@ -24,6 +24,12 @@ message ClientProto {
        optional bool in_ktls_mode = 19;
 };
 
+// Corresponds to struct Stream::FragmentStart.
+message FragmentStartProto {
+       optional int64 byte_position = 1;
+       optional double pts = 2;
+};
+
 // Corresponds to struct Stream.
 message StreamProto {
        optional bytes http_header = 6;
@@ -33,6 +39,9 @@ message StreamProto {
        // Tag 10 is unused from 1.4.0 up (it used to be prebuffering_bytes).
        optional int64 bytes_received = 3;
        repeated int64 suitable_starting_point = 9;
+       repeated FragmentStartProto fragment = 11;
+       optional int64 first_fragment_index = 12;
+       optional int64 discontinuity_counter = 13;
        optional string url = 4;
 };
 
index 0b494fb..62c7507 100644 (file)
@@ -1,6 +1,7 @@
 #include <assert.h>
 #include <errno.h>
 #include <limits.h>
+#include <math.h>
 #include <netinet/in.h>
 #include <stdio.h>
 #include <stdlib.h>
 
 using namespace std;
 
-Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding)
+Stream::Stream(const string &url,
+               size_t backlog_size,
+               size_t prebuffering_bytes,
+               Encoding encoding,
+               Encoding src_encoding,
+               unsigned hls_frag_duration,
+               size_t hls_backlog_margin,
+               const std::string &allow_origin)
        : url(url),
          encoding(encoding),
          src_encoding(src_encoding),
+         allow_origin(allow_origin),
          data_fd(make_tempfile("")),
-          backlog_size(backlog_size),
-         prebuffering_bytes(prebuffering_bytes)
+         backlog_size(backlog_size),
+         prebuffering_bytes(prebuffering_bytes),
+         hls_frag_duration(hls_frag_duration),
+         hls_backlog_margin(hls_backlog_margin)
 {
        if (data_fd == -1) {
                exit(1);
@@ -46,7 +57,9 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
          encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
          data_fd(data_fd),
          backlog_size(serialized.backlog_size()),
-         bytes_received(serialized.bytes_received())
+         bytes_received(serialized.bytes_received()),
+         first_fragment_index(serialized.first_fragment_index()),
+         discontinuity_counter(serialized.discontinuity_counter())
 {
        if (data_fd == -1) {
                exit(1);
@@ -61,6 +74,10 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
                }
                suitable_starting_points.push_back(point);
        }
+
+       for (const FragmentStartProto &fragment : serialized.fragment()) {
+               fragments.push_back(FragmentStart { size_t(fragment.byte_position()), fragment.pts() });
+       }
 }
 
 StreamProto Stream::serialize()
@@ -74,6 +91,14 @@ StreamProto Stream::serialize()
        for (size_t point : suitable_starting_points) {
                serialized.add_suitable_starting_point(point);
        }
+       for (const FragmentStart &fragment : fragments) {
+               FragmentStartProto *proto = serialized.add_fragment();
+               proto->set_byte_position(fragment.byte_position);
+               proto->set_pts(fragment.pts);
+       }
+       serialized.set_first_fragment_index(first_fragment_index);
+       serialized.set_discontinuity_counter(discontinuity_counter);
+
        serialized.set_url(url);
        data_fd = -1;
        return serialized;
@@ -167,6 +192,7 @@ vector<Stream::DataElement> remove_iovecs(const vector<Stream::DataElement> &dat
                        data_element.data.iov_base = reinterpret_cast<char *>(data[i].data.iov_base) + bytes_wanted;
                        data_element.data.iov_len = data[i].data.iov_len - bytes_wanted;
                        data_element.metacube_flags = METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
+                       data_element.pts = RationalPTS();
                        ret.push_back(data_element);
                        bytes_wanted = 0;
                }
@@ -211,9 +237,16 @@ void Stream::remove_obsolete_starting_points()
               bytes_received - suitable_starting_points[0] > backlog_size) {
                suitable_starting_points.pop_front();
        }
+       assert(backlog_size >= hls_backlog_margin);
+       while (!fragments.empty() &&
+              bytes_received - fragments[0].byte_position > (backlog_size - hls_backlog_margin)) {
+               fragments.pop_front();
+               ++first_fragment_index;
+               clear_hls_playlist_cache();
+       }
 }
 
-void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags)
+void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
 {
        // For regular output, we don't want to send the client twice
        // (it's already sent out together with the HTTP header).
@@ -229,8 +262,23 @@ void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube
 
        DataElement data_element;
        data_element.metacube_flags = metacube_flags;
+       data_element.pts = pts;
 
        if (encoding == Stream::STREAM_ENCODING_METACUBE) {
+               // Construct a PTS metadata block. (We'll avoid sending it out
+               // if we don't have a valid PTS.)
+               metacube2_pts_packet pts_packet;
+               pts_packet.type = htobe64(METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS);
+               pts_packet.pts = htobe64(pts.pts);
+               pts_packet.timebase_num = htobe64(pts.timebase_num);
+               pts_packet.timebase_den = htobe64(pts.timebase_den);
+
+               metacube2_block_header pts_hdr;
+               memcpy(pts_hdr.sync, METACUBE2_SYNC, sizeof(pts_hdr.sync));
+               pts_hdr.size = htonl(sizeof(pts_packet));
+               pts_hdr.flags = htons(METACUBE_FLAGS_METADATA);
+               pts_hdr.csum = htons(metacube2_compute_crc(&pts_hdr));
+
                // Add a Metacube block header before the data.
                metacube2_block_header hdr;
                memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
@@ -238,11 +286,23 @@ void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube
                hdr.flags = htons(metacube_flags);
                hdr.csum = htons(metacube2_compute_crc(&hdr));
 
-               data_element.data.iov_base = new char[bytes + sizeof(hdr)];
                data_element.data.iov_len = bytes + sizeof(hdr);
+               if (pts.timebase_num != 0) {
+                       data_element.data.iov_len += sizeof(pts_hdr) + sizeof(pts_packet);
+               }
+               data_element.data.iov_base = new char[data_element.data.iov_len];
+
+               char *ptr = reinterpret_cast<char *>(data_element.data.iov_base);
+               if (pts.timebase_num != 0) {
+                       memcpy(ptr, &pts_hdr, sizeof(pts_hdr));
+                       ptr += sizeof(pts_hdr);
+                       memcpy(ptr, &pts_packet, sizeof(pts_packet));
+                       ptr += sizeof(pts_packet);
+               }
 
-               memcpy(data_element.data.iov_base, &hdr, sizeof(hdr));
-               memcpy(reinterpret_cast<char *>(data_element.data.iov_base) + sizeof(hdr), data, bytes);
+               memcpy(ptr, &hdr, sizeof(hdr));
+               ptr += sizeof(hdr);
+               memcpy(ptr, data, bytes);
 
                queued_data.push_back(data_element);
        } else if (encoding == Stream::STREAM_ENCODING_RAW) {
@@ -281,6 +341,7 @@ void Stream::process_queued_data()
        // data, and 10 kB is a very fine granularity in most streams.
        static const int minimum_start_point_distance = 10240;
        size_t byte_position = bytes_received;
+       bool need_hls_clear = false;
        for (const DataElement &elem : queued_data_copy) {
                if ((elem.metacube_flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0) {
                        size_t num_points = suitable_starting_points.size();
@@ -290,9 +351,16 @@ void Stream::process_queued_data()
                                suitable_starting_points.pop_back();
                        }
                        suitable_starting_points.push_back(byte_position);
+
+                       if (elem.pts.timebase_num != 0) {
+                               need_hls_clear |= add_fragment_boundary(byte_position, elem.pts);
+                       }
                }
                byte_position += elem.data.iov_len;
        }
+       if (need_hls_clear) {
+               clear_hls_playlist_cache();
+       }
 
        add_data_raw(queued_data_copy);
        remove_obsolete_starting_points();
@@ -309,3 +377,96 @@ void Stream::process_queued_data()
                sleeping_clients.clear();
        }
 }
+
+bool Stream::add_fragment_boundary(size_t byte_position, const RationalPTS &pts)
+{
+       double pts_double = double(pts.pts) * pts.timebase_den / pts.timebase_num;
+
+       if (fragments.size() <= 1) {
+               // Just starting up, so try to establish the first in-progress fragment.
+               fragments.push_back(FragmentStart{ byte_position, pts_double });
+               return false;
+       }
+
+       // Keep extending the in-progress fragment as long as we do not
+       // exceed the target duration by more than half a second
+       // (RFC 8216 4.3.3.1) and we get closer to the target by doing so.
+       // Note that in particular, this means we'll always extend
+       // as long as we don't exceed the target duration.
+       double current_duration = fragments[fragments.size() - 1].pts;
+       double candidate_duration = pts_double - fragments[fragments.size() - 2].pts;
+       if (lrintf(candidate_duration) <= hls_frag_duration &&
+           fabs(candidate_duration - hls_frag_duration) < fabs(current_duration - hls_frag_duration)) {
+               fragments.back() = FragmentStart{ byte_position, pts_double };
+               return false;
+       } else {
+               // Extending the in-progress fragment would make it too long,
+               // so finalize it and start a new in-progress fragment.
+               fragments.push_back(FragmentStart{ byte_position, pts_double });
+               return true;
+       }
+}
+
+void Stream::clear_hls_playlist_cache()
+{
+       hls_playlist_http10.reset();
+       hls_playlist_http11_close.reset();
+       hls_playlist_http11_persistent.reset();
+}
+
+shared_ptr<const string> Stream::generate_hls_playlist(bool http_11, bool close_after_response)
+{
+       char buf[256];
+       snprintf(buf, sizeof(buf),
+               "#EXTM3U\r\n"
+               "#EXT-X-VERSION:7\r\n"
+               "#EXT-X-TARGETDURATION:%u\r\n"
+               "#EXT-X-MEDIA-SEQUENCE:%zu\r\n"
+               "#EXT-X-DISCONTINUITY-SEQUENCE:%zu\r\n",
+               hls_frag_duration,
+               first_fragment_index,
+               discontinuity_counter);
+
+       string playlist = buf;
+
+       if (!stream_header.empty()) {
+               snprintf(buf, sizeof(buf), "#EXT-X-MAP:URI=\"%s?frag=header\"\r\n", url.c_str());
+               playlist += buf;
+       }
+
+       playlist += "\r\n";
+       if (fragments.size() >= 3) {
+               for (size_t i = 0; i < fragments.size() - 2; ++i) {
+                       char buf[256];
+                       snprintf(buf, sizeof(buf), "#EXTINF:%f,\r\n%s?frag=%zu-%zu\r\n",
+                               fragments[i + 1].pts - fragments[i].pts,
+                               url.c_str(),
+                               fragments[i].byte_position,
+                               fragments[i + 1].byte_position);
+                       playlist += buf;
+               }
+       }
+
+       string response;
+       if (http_11) {
+               response = "HTTP/1.1 200 OK\r\n";
+               if (close_after_response) {
+                       response.append("Connection: close\r\n");
+               }
+       } else {
+               assert(close_after_response);
+               response = "HTTP/1.0 200 OK\r\n";
+       }
+       snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", playlist.size());
+       response.append(buf);
+       response.append("Content-type: application/x-mpegURL\r\n");
+       if (!allow_origin.empty()) {
+               response.append("Access-Control-Allow-Origin: ");
+               response.append(allow_origin);
+               response.append("\r\n");
+       }
+       response.append("\r\n");
+       response.append(move(playlist));
+
+       return shared_ptr<const string>(new string(move(response)));
+}
index 17df7f3..77e9d8a 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -9,18 +9,34 @@
 #include <sys/types.h>
 #include <sys/uio.h>
 #include <deque>
+#include <memory>
 #include <mutex>
 #include <string>
 #include <vector>
 
+#include "metacube2.h"
+
 class StreamProto;
 struct Client;
 
+// metacube2_pts_packet except the type and byteswapping.
+struct RationalPTS {
+       int64_t pts = 0;
+       uint64_t timebase_num = 0, timebase_den = 0;  // 0/0 for unknown PTS.
+};
+
 struct Stream {
        // Must be in sync with StreamConfig::Encoding.
        enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
 
-       Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding);
+       Stream(const std::string &url,
+              size_t backlog_size,
+              size_t prebuffering_bytes,
+              Encoding encoding,
+              Encoding src_encoding,
+              unsigned hls_frag_duration,
+              size_t hls_backlog_margin,
+              const std::string &allow_origin);
        ~Stream();
 
        // Serialization/deserialization.
@@ -50,6 +66,9 @@ struct Stream {
        // What encoding we expect the incoming data to be in (usually Metacube).
        Encoding src_encoding;
 
+       // Contents of CORS header (Access-Control-Allow-Origin), if any.
+       std::string allow_origin;
+
        // The stream data itself, stored in a circular buffer.
        //
        // We store our data in a file, so that we can send the data to the
@@ -78,7 +97,42 @@ struct Stream {
        // A list of points in the stream that is suitable to start new clients at
        // (after having sent the header). Empty if no such point exists yet.
        std::deque<size_t> suitable_starting_points;
-       
+
+       // A list of HLS fragment boundaries currently in the backlog; the first fragment
+       // is between point 0 and 1, the second is between 1 and 2, and so on.
+       // This roughly mirrors suitable_starting_points, but we generally make much
+       // larger fragments (we try to get as close as possible without exceeding
+       // <hls_frag_duration> seconds by too much).
+       //
+       // We keep this list even if we don't have HLS, given that we have pts data
+       // from the input stream.
+       //
+       // NOTE: The last fragment is an in-progress fragment, which can still be
+       // extended and thus should not be output. So the last fragment output is
+       // from points N-3..N-2.
+       struct FragmentStart {
+               size_t byte_position;
+               double pts;
+       };
+       std::deque<FragmentStart> fragments;
+       size_t first_fragment_index = 0, discontinuity_counter = 0;
+
+       // HLS target duration, in seconds.
+       unsigned hls_frag_duration = 6;
+
+       // Don't advertise new HLS fragments beginning before this point after the
+       // start of the backlog, so that we're reasonably sure that we can actually
+       // serve them even if the client can't completely keep up.
+       size_t hls_backlog_margin = 0;
+
+       // HLS playlists for this stream, in the form of a HTTP response, with
+       // headers and all. These are created on-demand, re-used by clients as
+       // needed, and cleared when they are no longer valid (e.g., when new fragments
+       // are added).
+       std::shared_ptr<const std::string> hls_playlist_http10;
+       std::shared_ptr<const std::string> hls_playlist_http11_close;
+       std::shared_ptr<const std::string> hls_playlist_http11_persistent;
+
        // Clients that are in SENDING_DATA, but that we don't listen on,
        // because we currently don't have any data for them.
        // See put_client_to_sleep() and wake_up_all_clients().
@@ -96,6 +150,7 @@ struct Stream {
        struct DataElement {
                iovec data;
                uint16_t metacube_flags;
+               RationalPTS pts;
        };
        std::vector<DataElement> queued_data;
 
@@ -106,12 +161,17 @@ struct Stream {
 
        // Add more data to <queued_data>, adding Metacube headers if needed.
        // Does not take ownership of <data>.
-       void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags);
+       void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts);
 
        // Add queued data to the stream, if any.
        // You should hold the owning Server's <mutex>.
        void process_queued_data();
 
+       // Generate a HLS playlist based on the current state, including HTTP headers.
+       std::shared_ptr<const std::string> generate_hls_playlist(bool http_11, bool close_after_response);
+
+       void clear_hls_playlist_cache();
+
 private:
        Stream(const Stream& other);
 
@@ -125,6 +185,11 @@ private:
        // in the backlog.
        // You should hold the owning Server's <mutex>.
        void remove_obsolete_starting_points();
+
+       // Extend the in-progress fragment to the given position, or finish it and start
+       // a new one if that would make it too long. Returns true if a new fragment
+       // was created (ie., the HLS playlists need to be regenerated).
+       bool add_fragment_boundary(size_t byte_position, const RationalPTS &pts);
 };
 
 #endif  // !defined(_STREAM_H)
index 0035640..a2915e8 100644 (file)
@@ -223,7 +223,7 @@ void UDPInput::do_work()
                }
        
                for (size_t stream_index : stream_indices) {    
-                       servers->add_data(stream_index, packet_buf, ret, /*metacube_flags=*/0);
+                       servers->add_data(stream_index, packet_buf, ret, /*metacube_flags=*/0, /*pts=*/RationalPTS());
                }
        }
 }