X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=stream.h;h=931da2aa236e941ad72833a3af0d12cdbfac2ec2;hp=7fabd193d1249016af37a821a5d67b3fce33ef91;hb=980ac162414c9fce62af4fdb9cfc282865b31572;hpb=40ed7df894c8645c132a5bea2bfb12a9be2b82ef diff --git a/stream.h b/stream.h index 7fabd19..931da2a 100644 --- a/stream.h +++ b/stream.h @@ -6,28 +6,71 @@ #include #include +#include +#include +#include +#include +#include #include #include -class MarkPool; +#include "metacube2.h" + class StreamProto; struct Client; +// metacube2_pts_packet except the type and byteswapping. +struct RationalPTS { + int64_t pts = 0; + uint64_t timebase_num = 0, timebase_den = 0; // 0/0 for unknown PTS. +}; + struct Stream { - Stream(const std::string &stream_id, size_t backlog_size); + // Must be in sync with StreamConfig::Encoding. + enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; + + Stream(const std::string &url, + size_t backlog_size, + uint64_t prebuffering_bytes, + Encoding encoding, + Encoding src_encoding, + unsigned hls_frag_duration, + size_t hls_backlog_margin, + const std::string &allow_origin); ~Stream(); // Serialization/deserialization. - Stream(const StreamProto &serialized); + Stream(const StreamProto &serialized, int data_fd); StreamProto serialize(); // Changes the backlog size, restructuring the data as needed. void set_backlog_size(size_t new_size); - std::string stream_id; + // You should hold the owning Server's , since it calls add_data_raw(). + void set_header(const std::string &new_http_header, const std::string &new_stream_header); + + // Mutex protecting and . + // Note that if you want to hold both this and the owning server's + // you will need to take before this one. + mutable std::mutex queued_data_mutex; + + std::string url; - // The HTTP response header, plus the video stream header (if any). - std::string header; + // The HTTP response header, without the trailing double newline. + std::string http_header; + + // The video stream header (if any). + std::string stream_header; + + // What encoding we apply to the outgoing data (usually raw, but can also + // be Metacube, for reflecting to another Cubemap instance). + Encoding encoding; + + // What encoding we expect the incoming data to be in (usually Metacube). + Encoding src_encoding; + + // Contents of CORS header (Access-Control-Allow-Origin), if any. + std::string allow_origin; // The stream data itself, stored in a circular buffer. // @@ -42,10 +85,65 @@ struct Stream { // How many bytes can hold (the buffer size). size_t backlog_size; + // How many bytes we need to have in the backlog before we start + // sending (in practice, we will then send all of them at once, + // and then start sending at the normal rate thereafter). + // This is basically to force a buffer on the client, which can help + // if the client expects us to be able to fill up the buffer much + // faster than realtime (ie., it expects a static file). + uint64_t prebuffering_bytes; + // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. - size_t bytes_received; - + uint64_t bytes_received = 0; + + // A list of points in the stream that is suitable to start new clients at + // (after having sent the header). Empty if no such point exists yet. + std::deque suitable_starting_points; + + // A list of HLS fragment boundaries currently in the backlog; the first fragment + // is between point 0 and 1, the second is between 1 and 2, and so on. + // This roughly mirrors suitable_starting_points, but we generally make much + // larger fragments (we try to get as close as possible without exceeding + // seconds by too much). + // + // We keep this list even if we don't have HLS, given that we have pts data + // from the input stream. + // + // NOTE: The last fragment is an in-progress fragment, which can still be + // extended and thus should not be output. So the last fragment output is + // from points N-3..N-2. + struct FragmentStart { + uint64_t byte_position; + double pts; // Unused if begins_header is true. + + // Whether the fragment started at this position is a stream header or not. + // Note that headers are stored _after_ the fragments they are headers for, + // so that they rotate out of the backlog last (and also because they are + // conveniently written then). The most current header is _not_ stored + // in the backlog; it is stored in stream_header. Only when replaced + // is it committed to the backlog and gets an entry here. + bool begins_header; + }; + std::deque fragments; + uint64_t first_fragment_index = 0, discontinuity_counter = 0; + + // HLS target duration, in seconds. + unsigned hls_frag_duration = 6; + + // Don't advertise new HLS fragments beginning before this point after the + // start of the backlog, so that we're reasonably sure that we can actually + // serve them even if the client can't completely keep up. + size_t hls_backlog_margin = 0; + + // HLS playlists for this stream, in the form of a HTTP response, with + // headers and all. These are created on-demand, re-used by clients as + // needed, and cleared when they are no longer valid (e.g., when new fragments + // are added). + std::shared_ptr hls_playlist_http10; + std::shared_ptr hls_playlist_http11_close; + std::shared_ptr hls_playlist_http11_persistent; + // Clients that are in SENDING_DATA, but that we don't listen on, // because we currently don't have any data for them. // See put_client_to_sleep() and wake_up_all_clients(). @@ -55,23 +153,54 @@ struct Stream { // ). std::vector to_process; - // What pool to fetch marks from, or NULL. - MarkPool *mark_pool; + // Maximum pacing rate for the stream. + uint32_t pacing_rate = ~0U; + + // Queued data, if any. Protected by . + // The data pointers in the iovec are owned by us. + struct DataElement { + iovec data; + uint16_t metacube_flags; + RationalPTS pts; + }; + std::vector queued_data; // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put // in the list of clients to wake up when we do. void put_client_to_sleep(Client *client); - // Add more input data to the stream. You should probably call wake_up_all_clients() - // after that. - void add_data(const char *data, ssize_t bytes); + // Add more data to , adding Metacube headers if needed. + // Does not take ownership of . + void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts); + + // Add queued data to the stream, if any. + // You should hold the owning Server's . + void process_queued_data(); - // We have more data, so mark all clients that are sleeping as ready to go. - void wake_up_all_clients(); + // Generate a HLS playlist based on the current state, including HTTP headers. + std::shared_ptr generate_hls_playlist(bool http_11, bool close_after_response); + + void clear_hls_playlist_cache(); private: Stream(const Stream& other); + + // Adds data directly to the stream file descriptor, without adding headers or + // going through . + // You should hold the owning Server's , and probably call + // remove_obsolete_starting_points() afterwards. + void add_data_raw(const std::vector &data); + + // Remove points from that are no longer + // in the backlog. + // You should hold the owning Server's . + void remove_obsolete_starting_points(); + + // Extend the in-progress fragment to the given position, or finish it and start + // a new one if that would make it too long. Returns true if a new fragment + // was created (ie., the HLS playlists need to be regenerated). + bool add_fragment_boundary(size_t byte_position, const RationalPTS &pts); }; #endif // !defined(_STREAM_H)