]> git.sesse.net Git - cubemap/commitdiff
Merge branch 'master' of /srv/git.sesse.net/www/cubemap
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 10 Jul 2015 21:46:37 +0000 (23:46 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 10 Jul 2015 21:46:37 +0000 (23:46 +0200)
13 files changed:
client.h
config.cpp
config.h
cubemap.config.sample
httpinput.cpp
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stream.cpp
stream.h

index 3767112b759b2c0c85bcf804b2ca77106ef68f78..d23b860767e16ecc372ab5726c6df8f2044ae7e3 100644 (file)
--- a/client.h
+++ b/client.h
@@ -37,7 +37,7 @@ struct Client {
        std::string remote_addr;
        time_t connect_time;
 
-       enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME };
+       enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME, PREBUFFERING };
        State state;
 
        // The HTTP request, as sent by the client. If we are in READING_REQUEST,
@@ -62,8 +62,8 @@ struct Client {
        // -1 means we want to send from the end of the backlog (the normal case),
        // although only at a keyframe.
        // -2 means we want to send from the _beginning_ of the backlog.
-       // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative
-       // values will be translated to real numbers.
+       // Once we go into WAITING_FOR_KEYFRAME, PREBUFFERING or SENDING_DATA,
+       // these negative values will be translated to real numbers.
        size_t stream_pos;
 
        // Number of bytes we've sent of data. Only relevant for SENDING_DATA.
index 091796151f811a241a929adde38b90afad8336bf..aa0f670a4b78a0607db4bf8af1d1232a14f21426 100644 (file)
@@ -235,6 +235,13 @@ bool parse_stream(const ConfigLine &line, Config *config)
                stream.backlog_size = atoi(backlog_it->second.c_str());
        }
 
+       map<string, string>::const_iterator prebuffer_it = line.parameters.find("force_prebuffer");
+       if (prebuffer_it == line.parameters.end()) {
+               stream.prebuffering_bytes = 0;
+       } else {
+               stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str());
+       }
+
        // Parse encoding.
        map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
        if (encoding_parm_it == line.parameters.end() ||
index 3014b48d4128d4f4b1150a28a23ddc953b33d3ac..e2dcb43050674a9d3d6d54ab5b98cb310739a444 100644 (file)
--- a/config.h
+++ b/config.h
@@ -13,6 +13,7 @@ struct StreamConfig {
        std::string url;  // As seen by the client.
        std::string src;  // Can be empty.
        size_t backlog_size;
+       size_t prebuffering_bytes;
        uint32_t pacing_rate;  // In bytes per second. Default is ~0U (no limit).
        enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
 };
index 479755b3c4cc616663a8e561b6f8836c0fe126cc..0953b7b288797060deab631540b72a512ba83341 100644 (file)
@@ -33,7 +33,7 @@ error_log type=console
 #
 # now the streams!
 #
-stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv
+stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_prebuffer=1500000
 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
 stream /udp.ts src=udp://@:1234 backlog_size=1048576
 stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000
index 74466c7acbb671f33f4f0fa3a76c77091e324038..043c07c6fd2085d7b199470da796318e19ee850b 100644 (file)
@@ -549,7 +549,7 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                        has_metacube_header = false;
                        continue;
                }
-               if (size > 262144) {
+               if (size > 1048576) {
                        log(WARNING, "[%s] Metacube block of %d bytes (flags=%x); corrupted header?",
                                url.c_str(), size, flags);
                }
index 889b737fcbe3c08d3983e00677b37cc5aff8aa7d..0218623c0ed6ea670182e54d17094a94f4e0f996 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -166,6 +166,7 @@ void create_streams(const Config &config,
                if (deserialized_urls.count(stream_config.url) == 0) {
                        stream_index = servers->add_stream(stream_config.url,
                                                           stream_config.backlog_size,
+                                                          stream_config.prebuffering_bytes,
                                                           Stream::Encoding(stream_config.encoding));
                } else {
                        stream_index = servers->lookup_stream_by_url(stream_config.url);
index a2f28e62a1d5ca370194fcd58b4f72bc97473e30..6fa4991b209f389da7f940aa73ff9462f472b8cd 100644 (file)
@@ -198,6 +198,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 
        if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+           client_ptr->state == Client::PREBUFFERING ||
            (client_ptr->state == Client::SENDING_DATA &&
             client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
                client_ptr->stream->put_client_to_sleep(client_ptr);
@@ -215,11 +216,11 @@ int Server::lookup_stream_by_url(const std::string &url) const
        return url_it->second;
 }
 
-int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
        url_map.insert(make_pair(url, streams.size()));
-       streams.push_back(new Stream(url, backlog_size, encoding));
+       streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
        return streams.size() - 1;
 }
 
@@ -399,6 +400,18 @@ sending_header_or_error_again:
                        return;
                }
                client->stream_pos = stream->last_suitable_starting_point;
+               client->state = Client::PREBUFFERING;
+               // Fall through.
+       }
+       case Client::PREBUFFERING: {
+               Stream *stream = client->stream;
+               size_t bytes_to_send = stream->bytes_received - client->stream_pos;
+               assert(bytes_to_send <= stream->backlog_size);
+               if (bytes_to_send < stream->prebuffering_bytes) {
+                       // We don't have enough bytes buffered to start this client yet.
+                       stream->put_client_to_sleep(client);
+                       return;
+               }
                client->state = Client::SENDING_DATA;
                // Fall through.
        }
index 6b94c0e6d9d9d2dbf98d0c025a77f814585b8d64..49f9d51badedd68c02ae2e7436b6807f1eccb060 100644 (file)
--- a/server.h
+++ b/server.h
@@ -53,7 +53,7 @@ public:
        // at the same time).
        CubemapStateProto serialize();
        void add_client_from_serialized(const ClientProto &client);
-       int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding);
+       int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
        int lookup_stream_by_url(const std::string &url) const;
        void set_backlog_size(int stream_index, size_t new_size);
index 4684d8c63267d4af225f55eeab61b401829f00b1..310a0a7f8fe894ffbfded8fd36bb348ed9b1babb 100644 (file)
@@ -73,14 +73,14 @@ int ServerPool::lookup_stream_by_url(const std::string &url) const
        return servers[0].lookup_stream_by_url(url);
 }
 
-int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
 {
        // Adding more HTTP streams after UDP streams would cause the UDP stream
        // indices to move around, which is obviously not good.
        assert(udp_streams.empty());
 
        for (int i = 0; i < num_servers; ++i) {
-               int stream_index = servers[i].add_stream(url, backlog_size, encoding);
+               int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding);
                assert(stream_index == num_http_streams);
        }
        return num_http_streams++;
index 8fa46e2493be3ce0796593d4ec31a628f0d01127..4b56544241afba4883fe899cae308d50707c9b63 100644 (file)
@@ -29,7 +29,7 @@ public:
        void add_client_from_serialized(const ClientProto &client);
 
        // Adds the given stream to all the servers. Returns the stream index.
-       int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
+       int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding);
        int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
        void delete_stream(const std::string &url);
        int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
index aba35812d30870088de6a7ef3bda0494934b9ac9..e612831e1179df8298d0703adfa5479a35ad2903 100644 (file)
@@ -20,6 +20,7 @@ message StreamProto {
        optional bytes stream_header = 7;
        repeated int32 data_fds = 8;
        optional int64 backlog_size = 5 [default=10485760];
+       optional int64 prebuffering_bytes = 10 [default=0];
        optional int64 bytes_received = 3;
        optional int64 last_suitable_starting_point = 9;
        optional string url = 4;
index 594d77cba33afe387502e07fbf021b1d180104ea..df131932435fd9a095edc50ec50c3c099a66a5a7 100644 (file)
 
 using namespace std;
 
-Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding)
        : url(url),
          encoding(encoding),
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
+         prebuffering_bytes(prebuffering_bytes),
          bytes_received(0),
          last_suitable_starting_point(-1),
          pacing_rate(~0U),
@@ -50,6 +51,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
          encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
          data_fd(data_fd),
          backlog_size(serialized.backlog_size()),
+         prebuffering_bytes(serialized.prebuffering_bytes()),
          bytes_received(serialized.bytes_received()),
          pacing_rate(~0U),
          queued_data_last_starting_point(-1)
@@ -71,6 +73,7 @@ StreamProto Stream::serialize()
        serialized.set_stream_header(stream_header);
        serialized.add_data_fds(data_fd);
        serialized.set_backlog_size(backlog_size);
+       serialized.set_prebuffering_bytes(prebuffering_bytes);
        serialized.set_bytes_received(bytes_received);
        serialized.set_last_suitable_starting_point(last_suitable_starting_point);
        serialized.set_url(url);
index ef475e6f1b2dff32674494a8999f85c6da5a9e35..4947c3fbd7fdf11a43b8456a78f4a0c80812c58f 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -23,7 +23,7 @@ struct Stream {
        // Must be in sync with StreamConfig::Encoding.
        enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
 
-       Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding);
+       Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding);
        ~Stream();
 
        // Serialization/deserialization.
@@ -63,6 +63,14 @@ struct Stream {
        // How many bytes <data_fd> can hold (the buffer size).
        size_t backlog_size;
 
+       // How many bytes we need to have in the backlog before we start
+       // sending (in practice, we will then send all of them at once,
+       // and then start sending at the normal rate thereafter).
+       // This is basically to force a buffer on the client, which can help
+       // if the client expects us to be able to fill up the buffer much
+       // faster than realtime (ie., it expects a static file).
+       size_t prebuffering_bytes;
+
        // How many bytes this stream have received. Can very well be larger
        // than <backlog_size>, since the buffer wraps.
        size_t bytes_received;