From 6889a665614e926437484a556124a5ff60363568 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 10 Jul 2015 23:29:42 +0200 Subject: [PATCH 1/1] Add support for forced prebuffering. The motivation is jwPlayer, which for HTTP files expects to be able to do no prebuffering and just download full speed nevertheless (as it assumes they are static files, not streams) -- when it cannot, it shows an ugly icon on top of the stream all the time. So we add an option for forced prebuffering (three seconds seems to be about fine) which means we wait sending until we have a pretty big backlog. Ideally, we would be able to actually send old data instead of just waiting (which would mean that the client doesn't need the extra wait at the beginning), but it's complicated with having to remember old keyframe positions, changed stream headers etc. --- client.h | 6 +++--- config.cpp | 7 +++++++ config.h | 1 + cubemap.config.sample | 2 +- main.cpp | 1 + server.cpp | 17 +++++++++++++++-- server.h | 2 +- serverpool.cpp | 4 ++-- serverpool.h | 2 +- state.proto | 1 + stream.cpp | 5 ++++- stream.h | 10 +++++++++- 12 files changed, 46 insertions(+), 12 deletions(-) diff --git a/client.h b/client.h index 3767112..d23b860 100644 --- a/client.h +++ b/client.h @@ -37,7 +37,7 @@ struct Client { std::string remote_addr; time_t connect_time; - enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME }; + enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME, PREBUFFERING }; State state; // The HTTP request, as sent by the client. If we are in READING_REQUEST, @@ -62,8 +62,8 @@ struct Client { // -1 means we want to send from the end of the backlog (the normal case), // although only at a keyframe. // -2 means we want to send from the _beginning_ of the backlog. - // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative - // values will be translated to real numbers. + // Once we go into WAITING_FOR_KEYFRAME, PREBUFFERING or SENDING_DATA, + // these negative values will be translated to real numbers. size_t stream_pos; // Number of bytes we've sent of data. Only relevant for SENDING_DATA. diff --git a/config.cpp b/config.cpp index 0917961..aa0f670 100644 --- a/config.cpp +++ b/config.cpp @@ -235,6 +235,13 @@ bool parse_stream(const ConfigLine &line, Config *config) stream.backlog_size = atoi(backlog_it->second.c_str()); } + map::const_iterator prebuffer_it = line.parameters.find("force_prebuffer"); + if (prebuffer_it == line.parameters.end()) { + stream.prebuffering_bytes = 0; + } else { + stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str()); + } + // Parse encoding. map::const_iterator encoding_parm_it = line.parameters.find("encoding"); if (encoding_parm_it == line.parameters.end() || diff --git a/config.h b/config.h index 3014b48..e2dcb43 100644 --- a/config.h +++ b/config.h @@ -13,6 +13,7 @@ struct StreamConfig { std::string url; // As seen by the client. std::string src; // Can be empty. size_t backlog_size; + size_t prebuffering_bytes; uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; }; diff --git a/cubemap.config.sample b/cubemap.config.sample index 479755b..0953b7b 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -33,7 +33,7 @@ error_log type=console # # now the streams! # -stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv +stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_prebuffer=1500000 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube stream /udp.ts src=udp://@:1234 backlog_size=1048576 stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000 diff --git a/main.cpp b/main.cpp index 889b737..0218623 100644 --- a/main.cpp +++ b/main.cpp @@ -166,6 +166,7 @@ void create_streams(const Config &config, if (deserialized_urls.count(stream_config.url) == 0) { stream_index = servers->add_stream(stream_config.url, stream_config.backlog_size, + stream_config.prebuffering_bytes, Stream::Encoding(stream_config.encoding)); } else { stream_index = servers->lookup_stream_by_url(stream_config.url); diff --git a/server.cpp b/server.cpp index a2f28e6..6fa4991 100644 --- a/server.cpp +++ b/server.cpp @@ -198,6 +198,7 @@ void Server::add_client_from_serialized(const ClientProto &client) } if (client_ptr->state == Client::WAITING_FOR_KEYFRAME || + client_ptr->state == Client::PREBUFFERING || (client_ptr->state == Client::SENDING_DATA && client_ptr->stream_pos == client_ptr->stream->bytes_received)) { client_ptr->stream->put_client_to_sleep(client_ptr); @@ -215,11 +216,11 @@ int Server::lookup_stream_by_url(const std::string &url) const return url_it->second; } -int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) +int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) { MutexLock lock(&mutex); url_map.insert(make_pair(url, streams.size())); - streams.push_back(new Stream(url, backlog_size, encoding)); + streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding)); return streams.size() - 1; } @@ -399,6 +400,18 @@ sending_header_or_error_again: return; } client->stream_pos = stream->last_suitable_starting_point; + client->state = Client::PREBUFFERING; + // Fall through. + } + case Client::PREBUFFERING: { + Stream *stream = client->stream; + size_t bytes_to_send = stream->bytes_received - client->stream_pos; + assert(bytes_to_send <= stream->backlog_size); + if (bytes_to_send < stream->prebuffering_bytes) { + // We don't have enough bytes buffered to start this client yet. + stream->put_client_to_sleep(client); + return; + } client->state = Client::SENDING_DATA; // Fall through. } diff --git a/server.h b/server.h index 6b94c0e..49f9d51 100644 --- a/server.h +++ b/server.h @@ -53,7 +53,7 @@ public: // at the same time). CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); - int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, int data_fd); int lookup_stream_by_url(const std::string &url) const; void set_backlog_size(int stream_index, size_t new_size); diff --git a/serverpool.cpp b/serverpool.cpp index 4684d8c..310a0a7 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -73,14 +73,14 @@ int ServerPool::lookup_stream_by_url(const std::string &url) const return servers[0].lookup_stream_by_url(url); } -int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) +int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) { // Adding more HTTP streams after UDP streams would cause the UDP stream // indices to move around, which is obviously not good. assert(udp_streams.empty()); for (int i = 0; i < num_servers; ++i) { - int stream_index = servers[i].add_stream(url, backlog_size, encoding); + int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding); assert(stream_index == num_http_streams); } return num_http_streams++; diff --git a/serverpool.h b/serverpool.h index 8fa46e2..4b56544 100644 --- a/serverpool.h +++ b/serverpool.h @@ -29,7 +29,7 @@ public: void add_client_from_serialized(const ClientProto &client); // Adds the given stream to all the servers. Returns the stream index. - int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); void delete_stream(const std::string &url); int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index); diff --git a/state.proto b/state.proto index aba3581..e612831 100644 --- a/state.proto +++ b/state.proto @@ -20,6 +20,7 @@ message StreamProto { optional bytes stream_header = 7; repeated int32 data_fds = 8; optional int64 backlog_size = 5 [default=10485760]; + optional int64 prebuffering_bytes = 10 [default=0]; optional int64 bytes_received = 3; optional int64 last_suitable_starting_point = 9; optional string url = 4; diff --git a/stream.cpp b/stream.cpp index 594d77c..df13193 100644 --- a/stream.cpp +++ b/stream.cpp @@ -19,11 +19,12 @@ using namespace std; -Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) +Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding) : url(url), encoding(encoding), data_fd(make_tempfile("")), backlog_size(backlog_size), + prebuffering_bytes(prebuffering_bytes), bytes_received(0), last_suitable_starting_point(-1), pacing_rate(~0U), @@ -50,6 +51,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd) encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later. data_fd(data_fd), backlog_size(serialized.backlog_size()), + prebuffering_bytes(serialized.prebuffering_bytes()), bytes_received(serialized.bytes_received()), pacing_rate(~0U), queued_data_last_starting_point(-1) @@ -71,6 +73,7 @@ StreamProto Stream::serialize() serialized.set_stream_header(stream_header); serialized.add_data_fds(data_fd); serialized.set_backlog_size(backlog_size); + serialized.set_prebuffering_bytes(prebuffering_bytes); serialized.set_bytes_received(bytes_received); serialized.set_last_suitable_starting_point(last_suitable_starting_point); serialized.set_url(url); diff --git a/stream.h b/stream.h index ef475e6..4947c3f 100644 --- a/stream.h +++ b/stream.h @@ -23,7 +23,7 @@ struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; - Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding); + Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding); ~Stream(); // Serialization/deserialization. @@ -63,6 +63,14 @@ struct Stream { // How many bytes can hold (the buffer size). size_t backlog_size; + // How many bytes we need to have in the backlog before we start + // sending (in practice, we will then send all of them at once, + // and then start sending at the normal rate thereafter). + // This is basically to force a buffer on the client, which can help + // if the client expects us to be able to fill up the buffer much + // faster than realtime (ie., it expects a static file). + size_t prebuffering_bytes; + // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. size_t bytes_received; -- 2.39.2