X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=stream.h;h=931da2aa236e941ad72833a3af0d12cdbfac2ec2;hp=9995765b2db95042fbe1c6369e9d3c2114896400;hb=980ac162414c9fce62af4fdb9cfc282865b31572;hpb=513af50c3e3454aafc25e97a7d20c206006e4d4c diff --git a/stream.h b/stream.h index 9995765..931da2a 100644 --- a/stream.h +++ b/stream.h @@ -9,17 +9,34 @@ #include #include #include +#include +#include #include #include +#include "metacube2.h" + class StreamProto; struct Client; +// metacube2_pts_packet except the type and byteswapping. +struct RationalPTS { + int64_t pts = 0; + uint64_t timebase_num = 0, timebase_den = 0; // 0/0 for unknown PTS. +}; + struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; - Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding); + Stream(const std::string &url, + size_t backlog_size, + uint64_t prebuffering_bytes, + Encoding encoding, + Encoding src_encoding, + unsigned hls_frag_duration, + size_t hls_backlog_margin, + const std::string &allow_origin); ~Stream(); // Serialization/deserialization. @@ -29,10 +46,13 @@ struct Stream { // Changes the backlog size, restructuring the data as needed. void set_backlog_size(size_t new_size); + // You should hold the owning Server's , since it calls add_data_raw(). + void set_header(const std::string &new_http_header, const std::string &new_stream_header); + // Mutex protecting and . // Note that if you want to hold both this and the owning server's // you will need to take before this one. - mutable pthread_mutex_t queued_data_mutex; + mutable std::mutex queued_data_mutex; std::string url; @@ -49,6 +69,9 @@ struct Stream { // What encoding we expect the incoming data to be in (usually Metacube). Encoding src_encoding; + // Contents of CORS header (Access-Control-Allow-Origin), if any. + std::string allow_origin; + // The stream data itself, stored in a circular buffer. // // We store our data in a file, so that we can send the data to the @@ -68,16 +91,59 @@ struct Stream { // This is basically to force a buffer on the client, which can help // if the client expects us to be able to fill up the buffer much // faster than realtime (ie., it expects a static file). - size_t prebuffering_bytes; + uint64_t prebuffering_bytes; // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. - size_t bytes_received; + uint64_t bytes_received = 0; // A list of points in the stream that is suitable to start new clients at // (after having sent the header). Empty if no such point exists yet. - std::deque suitable_starting_points; - + std::deque suitable_starting_points; + + // A list of HLS fragment boundaries currently in the backlog; the first fragment + // is between point 0 and 1, the second is between 1 and 2, and so on. + // This roughly mirrors suitable_starting_points, but we generally make much + // larger fragments (we try to get as close as possible without exceeding + // seconds by too much). + // + // We keep this list even if we don't have HLS, given that we have pts data + // from the input stream. + // + // NOTE: The last fragment is an in-progress fragment, which can still be + // extended and thus should not be output. So the last fragment output is + // from points N-3..N-2. + struct FragmentStart { + uint64_t byte_position; + double pts; // Unused if begins_header is true. + + // Whether the fragment started at this position is a stream header or not. + // Note that headers are stored _after_ the fragments they are headers for, + // so that they rotate out of the backlog last (and also because they are + // conveniently written then). The most current header is _not_ stored + // in the backlog; it is stored in stream_header. Only when replaced + // is it committed to the backlog and gets an entry here. + bool begins_header; + }; + std::deque fragments; + uint64_t first_fragment_index = 0, discontinuity_counter = 0; + + // HLS target duration, in seconds. + unsigned hls_frag_duration = 6; + + // Don't advertise new HLS fragments beginning before this point after the + // start of the backlog, so that we're reasonably sure that we can actually + // serve them even if the client can't completely keep up. + size_t hls_backlog_margin = 0; + + // HLS playlists for this stream, in the form of a HTTP response, with + // headers and all. These are created on-demand, re-used by clients as + // needed, and cleared when they are no longer valid (e.g., when new fragments + // are added). + std::shared_ptr hls_playlist_http10; + std::shared_ptr hls_playlist_http11_close; + std::shared_ptr hls_playlist_http11_persistent; + // Clients that are in SENDING_DATA, but that we don't listen on, // because we currently don't have any data for them. // See put_client_to_sleep() and wake_up_all_clients(). @@ -88,13 +154,14 @@ struct Stream { std::vector to_process; // Maximum pacing rate for the stream. - uint32_t pacing_rate; + uint32_t pacing_rate = ~0U; // Queued data, if any. Protected by . // The data pointers in the iovec are owned by us. struct DataElement { iovec data; uint16_t metacube_flags; + RationalPTS pts; }; std::vector queued_data; @@ -105,12 +172,17 @@ struct Stream { // Add more data to , adding Metacube headers if needed. // Does not take ownership of . - void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags); + void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts); // Add queued data to the stream, if any. // You should hold the owning Server's . void process_queued_data(); + // Generate a HLS playlist based on the current state, including HTTP headers. + std::shared_ptr generate_hls_playlist(bool http_11, bool close_after_response); + + void clear_hls_playlist_cache(); + private: Stream(const Stream& other); @@ -124,6 +196,11 @@ private: // in the backlog. // You should hold the owning Server's . void remove_obsolete_starting_points(); + + // Extend the in-progress fragment to the given position, or finish it and start + // a new one if that would make it too long. Returns true if a new fragment + // was created (ie., the HLS playlists need to be regenerated). + bool add_fragment_boundary(size_t byte_position, const RationalPTS &pts); }; #endif // !defined(_STREAM_H)