X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=stream.h;h=9e384a9ffd141b63fee39f44c8140b29269d9020;hp=9a9982a3541d489f72b63a18228f1e1d1910acca;hb=7ec54788f88dde7b083ba6cfd30732b32295b54a;hpb=00cf4a1ffcb987ef6d27fcf49811fd5ef572a985 diff --git a/stream.h b/stream.h index 9a9982a..9e384a9 100644 --- a/stream.h +++ b/stream.h @@ -8,22 +8,35 @@ #include #include #include +#include +#include +#include #include #include +#include "metacube2.h" + class StreamProto; struct Client; -enum StreamStartSuitability { - NOT_SUITABLE_FOR_STREAM_START, - SUITABLE_FOR_STREAM_START, +// metacube2_pts_packet except the type and byteswapping. +struct RationalPTS { + int64_t pts = 0; + uint64_t timebase_num = 0, timebase_den = 0; // 0/0 for unknown PTS. }; struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; - Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding); + Stream(const std::string &url, + size_t backlog_size, + uint64_t prebuffering_bytes, + Encoding encoding, + Encoding src_encoding, + unsigned hls_frag_duration, + size_t hls_backlog_margin, + const std::string &allow_origin); ~Stream(); // Serialization/deserialization. @@ -33,13 +46,25 @@ struct Stream { // Changes the backlog size, restructuring the data as needed. void set_backlog_size(size_t new_size); + // You should hold the owning Server's , since it calls add_data_raw(). + // Sets unavailable to false. + void set_header(const std::string &new_http_header, const std::string &new_stream_header); + + void set_unavailable() { + unavailable = true; + } + // Mutex protecting and . // Note that if you want to hold both this and the owning server's // you will need to take before this one. - mutable pthread_mutex_t queued_data_mutex; + mutable std::mutex queued_data_mutex; std::string url; + // If true, the backend is not completely connected, and thus, we cannot serve + // clients (except for historic HLS fragments). + bool unavailable = true; + // The HTTP response header, without the trailing double newline. std::string http_header; @@ -50,6 +75,12 @@ struct Stream { // be Metacube, for reflecting to another Cubemap instance). Encoding encoding; + // What encoding we expect the incoming data to be in (usually Metacube). + Encoding src_encoding; + + // Contents of CORS header (Access-Control-Allow-Origin), if any. + std::string allow_origin; + // The stream data itself, stored in a circular buffer. // // We store our data in a file, so that we can send the data to the @@ -69,16 +100,59 @@ struct Stream { // This is basically to force a buffer on the client, which can help // if the client expects us to be able to fill up the buffer much // faster than realtime (ie., it expects a static file). - size_t prebuffering_bytes; + uint64_t prebuffering_bytes; // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. - size_t bytes_received; + uint64_t bytes_received = 0; + + // A list of points in the stream that is suitable to start new clients at + // (after having sent the header). Empty if no such point exists yet. + std::deque suitable_starting_points; + + // A list of HLS fragment boundaries currently in the backlog; the first fragment + // is between point 0 and 1, the second is between 1 and 2, and so on. + // This roughly mirrors suitable_starting_points, but we generally make much + // larger fragments (we try to get as close as possible without exceeding + // seconds by too much). + // + // We keep this list even if we don't have HLS, given that we have pts data + // from the input stream. + // + // NOTE: The last fragment is an in-progress fragment, which can still be + // extended and thus should not be output. So the last fragment output is + // from points N-3..N-2. + struct FragmentStart { + uint64_t byte_position; + double pts; // Unused if begins_header is true. + + // Whether the fragment started at this position is a stream header or not. + // Note that headers are stored _after_ the fragments they are headers for, + // so that they rotate out of the backlog last (and also because they are + // conveniently written then). The most current header is _not_ stored + // in the backlog; it is stored in stream_header. Only when replaced + // is it committed to the backlog and gets an entry here. + bool begins_header; + }; + std::deque fragments; + uint64_t first_fragment_index = 0, discontinuity_counter = 0; + + // HLS target duration, in seconds. + unsigned hls_frag_duration = 6; + + // Don't advertise new HLS fragments beginning before this point after the + // start of the backlog, so that we're reasonably sure that we can actually + // serve them even if the client can't completely keep up. + size_t hls_backlog_margin = 0; + + // HLS playlists for this stream, in the form of a HTTP response, with + // headers and all. These are created on-demand, re-used by clients as + // needed, and cleared when they are no longer valid (e.g., when new fragments + // are added). + std::shared_ptr hls_playlist_http10; + std::shared_ptr hls_playlist_http11_close; + std::shared_ptr hls_playlist_http11_persistent; - // The last point in the stream that is suitable to start new clients at - // (after having sent the header). -1 if no such point exists yet. - ssize_t last_suitable_starting_point; - // Clients that are in SENDING_DATA, but that we don't listen on, // because we currently don't have any data for them. // See put_client_to_sleep() and wake_up_all_clients(). @@ -89,13 +163,14 @@ struct Stream { std::vector to_process; // Maximum pacing rate for the stream. - uint32_t pacing_rate; + uint32_t pacing_rate = ~0U; // Queued data, if any. Protected by . // The data pointers in the iovec are owned by us. struct DataElement { iovec data; - StreamStartSuitability suitable_for_stream_start; + uint16_t metacube_flags; + RationalPTS pts; }; std::vector queued_data; @@ -106,19 +181,35 @@ struct Stream { // Add more data to , adding Metacube headers if needed. // Does not take ownership of . - void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); + void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts); // Add queued data to the stream, if any. // You should hold the owning Server's . void process_queued_data(); + // Generate a HLS playlist based on the current state, including HTTP headers. + std::shared_ptr generate_hls_playlist(bool http_11, bool close_after_response); + + void clear_hls_playlist_cache(); + private: Stream(const Stream& other); // Adds data directly to the stream file descriptor, without adding headers or // going through . - // You should hold the owning Server's . + // You should hold the owning Server's , and probably call + // remove_obsolete_starting_points() afterwards. void add_data_raw(const std::vector &data); + + // Remove points from that are no longer + // in the backlog. + // You should hold the owning Server's . + void remove_obsolete_starting_points(); + + // Extend the in-progress fragment to the given position, or finish it and start + // a new one if that would make it too long. Returns true if a new fragment + // was created (ie., the HLS playlists need to be regenerated). + bool add_fragment_boundary(size_t byte_position, const RationalPTS &pts); }; #endif // !defined(_STREAM_H)