Support the new METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START Metacube flag.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 14 Aug 2013 21:09:52 +0000 (23:09 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 14 Aug 2013 21:09:52 +0000 (23:09 +0200)
This is created by new versions of the VLC Metacube patches;
it is designed for the needs of WebM, where browsers refuse to accept a
stream that does not start with a keyframe. Thus, we can have blocks that
are completely valid (and we wouldn't want to start joining blocks, as the
GOPs can become very large), but that we do not want to start a stream with.

For this, we introduce a new state called WAITING_FOR_KEYFRAME that all
new clients (except those fetching from the start of the backlog) go through;
they stay there until the next keyframe-marked block (anything that's not
marked METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START, so old Metacube streams
are still valid) comes along, at which point they go into the regular
SENDING_DATA.

The new version of the VLC Metacube patch is not included yet.

client.h
httpinput.cpp
metacube.h
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stream.cpp
stream.h
udpinput.cpp

index cd909e3..490190b 100644 (file)
--- a/client.h
+++ b/client.h
@@ -41,7 +41,7 @@ struct Client {
        std::string remote_addr;
        time_t connect_time;
 
-       enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR };
+       enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME };
        State state;
 
        // The HTTP request, as sent by the client. If we are in READING_REQUEST,
@@ -64,9 +64,10 @@ struct Client {
 
        // Number of bytes we are into the stream (ie., the end of last send).
        // -1 means we want to send from the end of the backlog (the normal case),
+       // although only at a keyframe.
        // -2 means we want to send from the _beginning_ of the backlog.
-       // Once we go into SENDING_DATA, these negative values will be translated
-       // to real numbers.
+       // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative
+       // values will be translated to real numbers.
        size_t stream_pos;
 
        // Number of bytes we've sent of data. Only relevant for SENDING_DATA.
index e22e0a5..0d94913 100644 (file)
@@ -475,9 +475,15 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                        for (size_t i = 0; i < stream_indices.size(); ++i) {
                                servers->set_header(stream_indices[i], http_header, header);
                        }
-               } else { 
+               } else {
+                       StreamStartSuitability suitable_for_stream_start;
+                       if (flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) {
+                               suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
+                       } else {
+                               suitable_for_stream_start = SUITABLE_FOR_STREAM_START;
+                       }
                        for (size_t i = 0; i < stream_indices.size(); ++i) {
-                               servers->add_data(stream_indices[i], inner_data, size);
+                               servers->add_data(stream_indices[i], inner_data, size, suitable_for_stream_start);
                        }
                }
 
index b40a42e..062325b 100644 (file)
@@ -7,6 +7,7 @@
 
 #define METACUBE_SYNC "\\o/_metacube_\\o/"  /* 16 bytes long. */
 #define METACUBE_FLAGS_HEADER 0x1
+#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2
 
 struct metacube_block_header {
        char sync[16];   /* METACUBE_SYNC */
index 7acf501..1637d94 100644 (file)
@@ -194,8 +194,9 @@ void Server::add_client_from_serialized(const ClientProto &client)
                exit(1);
        }
 
-       if (client_ptr->state == Client::SENDING_DATA && 
-           client_ptr->stream_pos == client_ptr->stream->bytes_received) {
+       if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+           (client_ptr->state == Client::SENDING_DATA &&
+            client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
                client_ptr->stream->put_client_to_sleep(client_ptr);
        } else {
                process_client(client_ptr);
@@ -269,11 +270,11 @@ void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
        streams[stream_index]->mark_pool = mark_pool;
 }
 
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
 {
        MutexLock lock(&queued_data_mutex);
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
-       streams[stream_index]->add_data_deferred(data, bytes);
+       streams[stream_index]->add_data_deferred(data, bytes, suitable_for_stream_start);
 }
 
 // See the .h file for postconditions after this function.     
@@ -380,22 +381,37 @@ sending_header_or_error_again:
                        return;
                }
 
-               // Start sending from the end. In other words, we won't send any of the backlog,
-               // but we'll start sending immediately as we get data.
+               // Start sending from the first keyframe we get. In other
+               // words, we won't send any of the backlog, but we'll start
+               // sending immediately as we get the next keyframe block.
                // This is postcondition #3.
-               client->state = Client::SENDING_DATA;
                if (client->stream_pos == size_t(-2)) {
                        client->stream_pos = std::min<size_t>(
                            client->stream->bytes_received - client->stream->backlog_size,
                            0);
+                       client->state = Client::SENDING_DATA;
                } else {
                        // client->stream_pos should be -1, but it might not be,
                        // if we have clients from an older version.
                        client->stream_pos = client->stream->bytes_received;
+                       client->state = Client::WAITING_FOR_KEYFRAME;
                }
                client->stream->put_client_to_sleep(client);
                return;
        }
+       case Client::WAITING_FOR_KEYFRAME: {
+               Stream *stream = client->stream;
+               if (ssize_t(client->stream_pos) > stream->last_suitable_starting_point) {
+                       // We haven't received a keyframe since this stream started waiting,
+                       // so keep on waiting for one.
+                       // This is postcondition #3.
+                       stream->put_client_to_sleep(client);
+                       return;
+               }
+               client->stream_pos = stream->last_suitable_starting_point;
+               client->state = Client::SENDING_DATA;
+               // Fall through.
+       }
        case Client::SENDING_DATA: {
                skip_lost_data(client);
                Stream *stream = client->stream;
index 963aca9..f0eb622 100644 (file)
--- a/server.h
+++ b/server.h
@@ -47,7 +47,7 @@ public:
        // and the order between them are undefined.
        // XXX: header should ideally be ordered with respect to data.
        void add_client_deferred(int sock);
-       void add_data_deferred(int stream_index, const char *data, size_t bytes);
+       void add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
 
        // These should not be called while running, since that would violate
        // threading assumptions (ie., that epoll is only called from one thread
index be47aa9..936bd21 100644 (file)
@@ -147,7 +147,7 @@ void ServerPool::set_header(int stream_index, const string &http_header, const s
        }
 }
 
-void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
 {
        assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
 
@@ -159,7 +159,7 @@ void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
 
        // HTTP stream.
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_data_deferred(stream_index, data, bytes);
+               servers[i].add_data_deferred(stream_index, data, bytes, suitable_for_stream_start);
        }
 }
 
index 9a8e171..13211f9 100644 (file)
@@ -41,7 +41,7 @@ public:
        void set_header(int stream_index,
                        const std::string &http_header,
                        const std::string &stream_header);
-       void add_data(int stream_index, const char *data, size_t bytes);
+       void add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
 
        // Connects the given stream to the given mark pool for all the servers.
        void set_mark_pool(int stream_index, MarkPool *mark_pool);
index 1776280..93c56e3 100644 (file)
@@ -21,6 +21,7 @@ message StreamProto {
        repeated int32 data_fds = 8;
        optional int64 backlog_size = 5 [default=1048576];
        optional int64 bytes_received = 3;
+       optional int64 last_suitable_starting_point = 9;
        optional string url = 4;
 
        // Older versions stored the HTTP and video headers together in this field.
index e8d65db..6563b5b 100644 (file)
@@ -22,7 +22,9 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
          bytes_received(0),
-         mark_pool(NULL)
+         last_suitable_starting_point(-1),
+         mark_pool(NULL),
+         queued_data_last_starting_point(-1)
 {
        if (data_fd == -1) {
                exit(1);
@@ -44,7 +46,8 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
          data_fd(data_fd),
          backlog_size(serialized.backlog_size()),
          bytes_received(serialized.bytes_received()),
-         mark_pool(NULL)
+         mark_pool(NULL),
+         queued_data_last_starting_point(-1)
 {
        if (data_fd == -1) {
                exit(1);
@@ -62,6 +65,13 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
                        stream_header = header.substr(split, string::npos);
                }
        }
+
+       // Older versions did not set last_suitable_starting_point.
+       if (serialized.has_last_suitable_starting_point()) {
+               last_suitable_starting_point = serialized.last_suitable_starting_point();
+       } else {
+               last_suitable_starting_point = bytes_received;
+       }
 }
 
 StreamProto Stream::serialize()
@@ -72,6 +82,7 @@ StreamProto Stream::serialize()
        serialized.add_data_fds(data_fd);
        serialized.set_backlog_size(backlog_size);
        serialized.set_bytes_received(bytes_received);
+       serialized.set_last_suitable_starting_point(last_suitable_starting_point);
        serialized.set_url(url);
        data_fd = -1;
        return serialized;
@@ -198,14 +209,23 @@ void Stream::add_data_raw(const vector<iovec> &orig_data)
        }
 }
 
-void Stream::add_data_deferred(const char *data, size_t bytes)
+void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
 {
+       assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
+              suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
+       if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
+               queued_data_last_starting_point = queued_data.size();
+       }
+
        if (encoding == Stream::STREAM_ENCODING_METACUBE) {
                // Add a Metacube block header before the data.
                metacube_block_header hdr;
                memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
                hdr.size = htonl(bytes);
                hdr.flags = htonl(0);
+               if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
+                       hdr.flags |= htonl(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
+               }
 
                iovec iov;
                iov.iov_base = new char[bytes + sizeof(hdr)];
@@ -234,12 +254,23 @@ void Stream::process_queued_data()
                return;
        }
 
+       // Update the last suitable starting point for the stream,
+       // if the queued data contains such a starting point.
+       assert(queued_data_last_starting_point < ssize_t(queued_data.size()));
+       if (queued_data_last_starting_point >= 0) {
+               last_suitable_starting_point = bytes_received;
+               for (int i = 0; i < queued_data_last_starting_point; ++i) {
+                       last_suitable_starting_point += queued_data[i].iov_len;
+               }
+       }
+
        add_data_raw(queued_data);
        for (size_t i = 0; i < queued_data.size(); ++i) {
                char *data = reinterpret_cast<char *>(queued_data[i].iov_base);
                delete[] data;
        }
        queued_data.clear();
+       queued_data_last_starting_point = -1;
 
        // We have more data, so wake up all clients.
        if (to_process.empty()) {
index d762869..f2ba5d4 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -15,6 +15,11 @@ class MarkPool;
 class StreamProto;
 struct Client;
 
+enum StreamStartSuitability {
+       NOT_SUITABLE_FOR_STREAM_START,
+       SUITABLE_FOR_STREAM_START,
+};
+
 struct Stream {
        // Must be in sync with StreamConfig::Encoding.
        enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
@@ -57,6 +62,10 @@ struct Stream {
        // How many bytes this stream have received. Can very well be larger
        // than <backlog_size>, since the buffer wraps.
        size_t bytes_received;
+
+       // The last point in the stream that is suitable to start new clients at
+       // (after having sent the header). -1 if no such point exists yet.
+       ssize_t last_suitable_starting_point;
        
        // Clients that are in SENDING_DATA, but that we don't listen on,
        // because we currently don't have any data for them.
@@ -74,6 +83,10 @@ struct Stream {
        // The data pointers in the iovec are owned by us.
        std::vector<iovec> queued_data;
 
+       // Index of the last element in queued_data that is suitable to start streaming at.
+       // -1 if none.
+       int queued_data_last_starting_point;
+
        // Put client to sleep, since there is no more data for it; we will on
        // longer listen on POLLOUT until we get more data. Also, it will be put
        // in the list of clients to wake up when we do.
@@ -82,7 +95,7 @@ struct Stream {
        // Add more data to <queued_data>, adding Metacube headers if needed.
        // Does not take ownership of <data>.
        // You should hold the owning Server's <queued_data_mutex>.
-       void add_data_deferred(const char *data, size_t bytes);
+       void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
 
        // Add queued data to the stream, if any.
        // You should hold the owning Server's <mutex> _and_ <queued_data_mutex>.
index 3e4b9b7..048866a 100644 (file)
@@ -106,7 +106,7 @@ void UDPInput::do_work()
                }
                
                for (size_t i = 0; i < stream_indices.size(); ++i) {
-                       servers->add_data(stream_indices[i], packet_buf, ret);
+                       servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
                }
        }
 }