X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=stream.h;h=931da2aa236e941ad72833a3af0d12cdbfac2ec2;hp=f2ba5d43dccc561ef530cf2445ea302f3a555e05;hb=980ac162414c9fce62af4fdb9cfc282865b31572;hpb=1c6b126fe95eb0465383ba225da764757eba05c0 diff --git a/stream.h b/stream.h index f2ba5d4..931da2a 100644 --- a/stream.h +++ b/stream.h @@ -8,23 +8,35 @@ #include #include #include +#include +#include +#include #include #include -class MarkPool; +#include "metacube2.h" + class StreamProto; struct Client; -enum StreamStartSuitability { - NOT_SUITABLE_FOR_STREAM_START, - SUITABLE_FOR_STREAM_START, +// metacube2_pts_packet except the type and byteswapping. +struct RationalPTS { + int64_t pts = 0; + uint64_t timebase_num = 0, timebase_den = 0; // 0/0 for unknown PTS. }; struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; - Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding); + Stream(const std::string &url, + size_t backlog_size, + uint64_t prebuffering_bytes, + Encoding encoding, + Encoding src_encoding, + unsigned hls_frag_duration, + size_t hls_backlog_margin, + const std::string &allow_origin); ~Stream(); // Serialization/deserialization. @@ -34,6 +46,14 @@ struct Stream { // Changes the backlog size, restructuring the data as needed. void set_backlog_size(size_t new_size); + // You should hold the owning Server's , since it calls add_data_raw(). + void set_header(const std::string &new_http_header, const std::string &new_stream_header); + + // Mutex protecting and . + // Note that if you want to hold both this and the owning server's + // you will need to take before this one. + mutable std::mutex queued_data_mutex; + std::string url; // The HTTP response header, without the trailing double newline. @@ -46,6 +66,12 @@ struct Stream { // be Metacube, for reflecting to another Cubemap instance). Encoding encoding; + // What encoding we expect the incoming data to be in (usually Metacube). + Encoding src_encoding; + + // Contents of CORS header (Access-Control-Allow-Origin), if any. + std::string allow_origin; + // The stream data itself, stored in a circular buffer. // // We store our data in a file, so that we can send the data to the @@ -59,14 +85,65 @@ struct Stream { // How many bytes can hold (the buffer size). size_t backlog_size; + // How many bytes we need to have in the backlog before we start + // sending (in practice, we will then send all of them at once, + // and then start sending at the normal rate thereafter). + // This is basically to force a buffer on the client, which can help + // if the client expects us to be able to fill up the buffer much + // faster than realtime (ie., it expects a static file). + uint64_t prebuffering_bytes; + // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. - size_t bytes_received; + uint64_t bytes_received = 0; + + // A list of points in the stream that is suitable to start new clients at + // (after having sent the header). Empty if no such point exists yet. + std::deque suitable_starting_points; + + // A list of HLS fragment boundaries currently in the backlog; the first fragment + // is between point 0 and 1, the second is between 1 and 2, and so on. + // This roughly mirrors suitable_starting_points, but we generally make much + // larger fragments (we try to get as close as possible without exceeding + // seconds by too much). + // + // We keep this list even if we don't have HLS, given that we have pts data + // from the input stream. + // + // NOTE: The last fragment is an in-progress fragment, which can still be + // extended and thus should not be output. So the last fragment output is + // from points N-3..N-2. + struct FragmentStart { + uint64_t byte_position; + double pts; // Unused if begins_header is true. + + // Whether the fragment started at this position is a stream header or not. + // Note that headers are stored _after_ the fragments they are headers for, + // so that they rotate out of the backlog last (and also because they are + // conveniently written then). The most current header is _not_ stored + // in the backlog; it is stored in stream_header. Only when replaced + // is it committed to the backlog and gets an entry here. + bool begins_header; + }; + std::deque fragments; + uint64_t first_fragment_index = 0, discontinuity_counter = 0; + + // HLS target duration, in seconds. + unsigned hls_frag_duration = 6; + + // Don't advertise new HLS fragments beginning before this point after the + // start of the backlog, so that we're reasonably sure that we can actually + // serve them even if the client can't completely keep up. + size_t hls_backlog_margin = 0; + + // HLS playlists for this stream, in the form of a HTTP response, with + // headers and all. These are created on-demand, re-used by clients as + // needed, and cleared when they are no longer valid (e.g., when new fragments + // are added). + std::shared_ptr hls_playlist_http10; + std::shared_ptr hls_playlist_http11_close; + std::shared_ptr hls_playlist_http11_persistent; - // The last point in the stream that is suitable to start new clients at - // (after having sent the header). -1 if no such point exists yet. - ssize_t last_suitable_starting_point; - // Clients that are in SENDING_DATA, but that we don't listen on, // because we currently don't have any data for them. // See put_client_to_sleep() and wake_up_all_clients(). @@ -76,16 +153,17 @@ struct Stream { // ). std::vector to_process; - // What pool to fetch marks from, or NULL. - MarkPool *mark_pool; + // Maximum pacing rate for the stream. + uint32_t pacing_rate = ~0U; - // Queued data, if any. Protected by the owning Server's . + // Queued data, if any. Protected by . // The data pointers in the iovec are owned by us. - std::vector queued_data; - - // Index of the last element in queued_data that is suitable to start streaming at. - // -1 if none. - int queued_data_last_starting_point; + struct DataElement { + iovec data; + uint16_t metacube_flags; + RationalPTS pts; + }; + std::vector queued_data; // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put @@ -94,20 +172,35 @@ struct Stream { // Add more data to , adding Metacube headers if needed. // Does not take ownership of . - // You should hold the owning Server's . - void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); + void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts); // Add queued data to the stream, if any. - // You should hold the owning Server's _and_ . + // You should hold the owning Server's . void process_queued_data(); + // Generate a HLS playlist based on the current state, including HTTP headers. + std::shared_ptr generate_hls_playlist(bool http_11, bool close_after_response); + + void clear_hls_playlist_cache(); + private: Stream(const Stream& other); // Adds data directly to the stream file descriptor, without adding headers or // going through . + // You should hold the owning Server's , and probably call + // remove_obsolete_starting_points() afterwards. + void add_data_raw(const std::vector &data); + + // Remove points from that are no longer + // in the backlog. // You should hold the owning Server's . - void add_data_raw(const std::vector &data); + void remove_obsolete_starting_points(); + + // Extend the in-progress fragment to the given position, or finish it and start + // a new one if that would make it too long. Returns true if a new fragment + // was created (ie., the HLS playlists need to be regenerated). + bool add_fragment_boundary(size_t byte_position, const RationalPTS &pts); }; #endif // !defined(_STREAM_H)