From: Steinar H. Gunderson Date: Fri, 10 Jul 2015 21:46:37 +0000 (+0200) Subject: Merge branch 'master' of /srv/git.sesse.net/www/cubemap X-Git-Tag: 1.2.0~23 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=0e96bbf9ee0fbebd5fe3fba4d186c0e0d73c9a32;hp=ef4cf9957b668845eb1064f583a0021009cbeeac Merge branch 'master' of /srv/git.sesse.net/www/cubemap --- diff --git a/client.h b/client.h index 3767112..d23b860 100644 --- a/client.h +++ b/client.h @@ -37,7 +37,7 @@ struct Client { std::string remote_addr; time_t connect_time; - enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME }; + enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME, PREBUFFERING }; State state; // The HTTP request, as sent by the client. If we are in READING_REQUEST, @@ -62,8 +62,8 @@ struct Client { // -1 means we want to send from the end of the backlog (the normal case), // although only at a keyframe. // -2 means we want to send from the _beginning_ of the backlog. - // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative - // values will be translated to real numbers. + // Once we go into WAITING_FOR_KEYFRAME, PREBUFFERING or SENDING_DATA, + // these negative values will be translated to real numbers. size_t stream_pos; // Number of bytes we've sent of data. Only relevant for SENDING_DATA. diff --git a/config.cpp b/config.cpp index 0917961..aa0f670 100644 --- a/config.cpp +++ b/config.cpp @@ -235,6 +235,13 @@ bool parse_stream(const ConfigLine &line, Config *config) stream.backlog_size = atoi(backlog_it->second.c_str()); } + map::const_iterator prebuffer_it = line.parameters.find("force_prebuffer"); + if (prebuffer_it == line.parameters.end()) { + stream.prebuffering_bytes = 0; + } else { + stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str()); + } + // Parse encoding. map::const_iterator encoding_parm_it = line.parameters.find("encoding"); if (encoding_parm_it == line.parameters.end() || diff --git a/config.h b/config.h index 3014b48..e2dcb43 100644 --- a/config.h +++ b/config.h @@ -13,6 +13,7 @@ struct StreamConfig { std::string url; // As seen by the client. std::string src; // Can be empty. size_t backlog_size; + size_t prebuffering_bytes; uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; }; diff --git a/cubemap.config.sample b/cubemap.config.sample index 479755b..0953b7b 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -33,7 +33,7 @@ error_log type=console # # now the streams! # -stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv +stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_prebuffer=1500000 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube stream /udp.ts src=udp://@:1234 backlog_size=1048576 stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000 diff --git a/httpinput.cpp b/httpinput.cpp index 74466c7..043c07c 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -549,7 +549,7 @@ void HTTPInput::process_data(char *ptr, size_t bytes) has_metacube_header = false; continue; } - if (size > 262144) { + if (size > 1048576) { log(WARNING, "[%s] Metacube block of %d bytes (flags=%x); corrupted header?", url.c_str(), size, flags); } diff --git a/main.cpp b/main.cpp index 889b737..0218623 100644 --- a/main.cpp +++ b/main.cpp @@ -166,6 +166,7 @@ void create_streams(const Config &config, if (deserialized_urls.count(stream_config.url) == 0) { stream_index = servers->add_stream(stream_config.url, stream_config.backlog_size, + stream_config.prebuffering_bytes, Stream::Encoding(stream_config.encoding)); } else { stream_index = servers->lookup_stream_by_url(stream_config.url); diff --git a/server.cpp b/server.cpp index a2f28e6..6fa4991 100644 --- a/server.cpp +++ b/server.cpp @@ -198,6 +198,7 @@ void Server::add_client_from_serialized(const ClientProto &client) } if (client_ptr->state == Client::WAITING_FOR_KEYFRAME || + client_ptr->state == Client::PREBUFFERING || (client_ptr->state == Client::SENDING_DATA && client_ptr->stream_pos == client_ptr->stream->bytes_received)) { client_ptr->stream->put_client_to_sleep(client_ptr); @@ -215,11 +216,11 @@ int Server::lookup_stream_by_url(const std::string &url) const return url_it->second; } -int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) +int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) { MutexLock lock(&mutex); url_map.insert(make_pair(url, streams.size())); - streams.push_back(new Stream(url, backlog_size, encoding)); + streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding)); return streams.size() - 1; } @@ -399,6 +400,18 @@ sending_header_or_error_again: return; } client->stream_pos = stream->last_suitable_starting_point; + client->state = Client::PREBUFFERING; + // Fall through. + } + case Client::PREBUFFERING: { + Stream *stream = client->stream; + size_t bytes_to_send = stream->bytes_received - client->stream_pos; + assert(bytes_to_send <= stream->backlog_size); + if (bytes_to_send < stream->prebuffering_bytes) { + // We don't have enough bytes buffered to start this client yet. + stream->put_client_to_sleep(client); + return; + } client->state = Client::SENDING_DATA; // Fall through. } diff --git a/server.h b/server.h index 6b94c0e..49f9d51 100644 --- a/server.h +++ b/server.h @@ -53,7 +53,7 @@ public: // at the same time). CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); - int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, int data_fd); int lookup_stream_by_url(const std::string &url) const; void set_backlog_size(int stream_index, size_t new_size); diff --git a/serverpool.cpp b/serverpool.cpp index 4684d8c..310a0a7 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -73,14 +73,14 @@ int ServerPool::lookup_stream_by_url(const std::string &url) const return servers[0].lookup_stream_by_url(url); } -int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) +int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) { // Adding more HTTP streams after UDP streams would cause the UDP stream // indices to move around, which is obviously not good. assert(udp_streams.empty()); for (int i = 0; i < num_servers; ++i) { - int stream_index = servers[i].add_stream(url, backlog_size, encoding); + int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding); assert(stream_index == num_http_streams); } return num_http_streams++; diff --git a/serverpool.h b/serverpool.h index 8fa46e2..4b56544 100644 --- a/serverpool.h +++ b/serverpool.h @@ -29,7 +29,7 @@ public: void add_client_from_serialized(const ClientProto &client); // Adds the given stream to all the servers. Returns the stream index. - int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); void delete_stream(const std::string &url); int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index); diff --git a/state.proto b/state.proto index aba3581..e612831 100644 --- a/state.proto +++ b/state.proto @@ -20,6 +20,7 @@ message StreamProto { optional bytes stream_header = 7; repeated int32 data_fds = 8; optional int64 backlog_size = 5 [default=10485760]; + optional int64 prebuffering_bytes = 10 [default=0]; optional int64 bytes_received = 3; optional int64 last_suitable_starting_point = 9; optional string url = 4; diff --git a/stream.cpp b/stream.cpp index 594d77c..df13193 100644 --- a/stream.cpp +++ b/stream.cpp @@ -19,11 +19,12 @@ using namespace std; -Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) +Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding) : url(url), encoding(encoding), data_fd(make_tempfile("")), backlog_size(backlog_size), + prebuffering_bytes(prebuffering_bytes), bytes_received(0), last_suitable_starting_point(-1), pacing_rate(~0U), @@ -50,6 +51,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd) encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later. data_fd(data_fd), backlog_size(serialized.backlog_size()), + prebuffering_bytes(serialized.prebuffering_bytes()), bytes_received(serialized.bytes_received()), pacing_rate(~0U), queued_data_last_starting_point(-1) @@ -71,6 +73,7 @@ StreamProto Stream::serialize() serialized.set_stream_header(stream_header); serialized.add_data_fds(data_fd); serialized.set_backlog_size(backlog_size); + serialized.set_prebuffering_bytes(prebuffering_bytes); serialized.set_bytes_received(bytes_received); serialized.set_last_suitable_starting_point(last_suitable_starting_point); serialized.set_url(url); diff --git a/stream.h b/stream.h index ef475e6..4947c3f 100644 --- a/stream.h +++ b/stream.h @@ -23,7 +23,7 @@ struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; - Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding); + Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding); ~Stream(); // Serialization/deserialization. @@ -63,6 +63,14 @@ struct Stream { // How many bytes can hold (the buffer size). size_t backlog_size; + // How many bytes we need to have in the backlog before we start + // sending (in practice, we will then send all of them at once, + // and then start sending at the normal rate thereafter). + // This is basically to force a buffer on the client, which can help + // if the client expects us to be able to fill up the buffer much + // faster than realtime (ie., it expects a static file). + size_t prebuffering_bytes; + // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. size_t bytes_received;