From 1c6b126fe95eb0465383ba225da764757eba05c0 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Wed, 14 Aug 2013 23:09:52 +0200 Subject: [PATCH] Support the new METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START Metacube flag. This is created by new versions of the VLC Metacube patches; it is designed for the needs of WebM, where browsers refuse to accept a stream that does not start with a keyframe. Thus, we can have blocks that are completely valid (and we wouldn't want to start joining blocks, as the GOPs can become very large), but that we do not want to start a stream with. For this, we introduce a new state called WAITING_FOR_KEYFRAME that all new clients (except those fetching from the start of the backlog) go through; they stay there until the next keyframe-marked block (anything that's not marked METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START, so old Metacube streams are still valid) comes along, at which point they go into the regular SENDING_DATA. The new version of the VLC Metacube patch is not included yet. --- client.h | 7 ++++--- httpinput.cpp | 10 ++++++++-- metacube.h | 1 + server.cpp | 30 +++++++++++++++++++++++------- server.h | 2 +- serverpool.cpp | 4 ++-- serverpool.h | 2 +- state.proto | 1 + stream.cpp | 37 ++++++++++++++++++++++++++++++++++--- stream.h | 15 ++++++++++++++- udpinput.cpp | 2 +- 11 files changed, 90 insertions(+), 21 deletions(-) diff --git a/client.h b/client.h index cd909e3..490190b 100644 --- a/client.h +++ b/client.h @@ -41,7 +41,7 @@ struct Client { std::string remote_addr; time_t connect_time; - enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR }; + enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME }; State state; // The HTTP request, as sent by the client. If we are in READING_REQUEST, @@ -64,9 +64,10 @@ struct Client { // Number of bytes we are into the stream (ie., the end of last send). // -1 means we want to send from the end of the backlog (the normal case), + // although only at a keyframe. // -2 means we want to send from the _beginning_ of the backlog. - // Once we go into SENDING_DATA, these negative values will be translated - // to real numbers. + // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative + // values will be translated to real numbers. size_t stream_pos; // Number of bytes we've sent of data. Only relevant for SENDING_DATA. diff --git a/httpinput.cpp b/httpinput.cpp index e22e0a5..0d94913 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -475,9 +475,15 @@ void HTTPInput::process_data(char *ptr, size_t bytes) for (size_t i = 0; i < stream_indices.size(); ++i) { servers->set_header(stream_indices[i], http_header, header); } - } else { + } else { + StreamStartSuitability suitable_for_stream_start; + if (flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) { + suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START; + } else { + suitable_for_stream_start = SUITABLE_FOR_STREAM_START; + } for (size_t i = 0; i < stream_indices.size(); ++i) { - servers->add_data(stream_indices[i], inner_data, size); + servers->add_data(stream_indices[i], inner_data, size, suitable_for_stream_start); } } diff --git a/metacube.h b/metacube.h index b40a42e..062325b 100644 --- a/metacube.h +++ b/metacube.h @@ -7,6 +7,7 @@ #define METACUBE_SYNC "\\o/_metacube_\\o/" /* 16 bytes long. */ #define METACUBE_FLAGS_HEADER 0x1 +#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2 struct metacube_block_header { char sync[16]; /* METACUBE_SYNC */ diff --git a/server.cpp b/server.cpp index 7acf501..1637d94 100644 --- a/server.cpp +++ b/server.cpp @@ -194,8 +194,9 @@ void Server::add_client_from_serialized(const ClientProto &client) exit(1); } - if (client_ptr->state == Client::SENDING_DATA && - client_ptr->stream_pos == client_ptr->stream->bytes_received) { + if (client_ptr->state == Client::WAITING_FOR_KEYFRAME || + (client_ptr->state == Client::SENDING_DATA && + client_ptr->stream_pos == client_ptr->stream->bytes_received)) { client_ptr->stream->put_client_to_sleep(client_ptr); } else { process_client(client_ptr); @@ -269,11 +270,11 @@ void Server::set_mark_pool(int stream_index, MarkPool *mark_pool) streams[stream_index]->mark_pool = mark_pool; } -void Server::add_data_deferred(int stream_index, const char *data, size_t bytes) +void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { MutexLock lock(&queued_data_mutex); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); - streams[stream_index]->add_data_deferred(data, bytes); + streams[stream_index]->add_data_deferred(data, bytes, suitable_for_stream_start); } // See the .h file for postconditions after this function. @@ -380,22 +381,37 @@ sending_header_or_error_again: return; } - // Start sending from the end. In other words, we won't send any of the backlog, - // but we'll start sending immediately as we get data. + // Start sending from the first keyframe we get. In other + // words, we won't send any of the backlog, but we'll start + // sending immediately as we get the next keyframe block. // This is postcondition #3. - client->state = Client::SENDING_DATA; if (client->stream_pos == size_t(-2)) { client->stream_pos = std::min( client->stream->bytes_received - client->stream->backlog_size, 0); + client->state = Client::SENDING_DATA; } else { // client->stream_pos should be -1, but it might not be, // if we have clients from an older version. client->stream_pos = client->stream->bytes_received; + client->state = Client::WAITING_FOR_KEYFRAME; } client->stream->put_client_to_sleep(client); return; } + case Client::WAITING_FOR_KEYFRAME: { + Stream *stream = client->stream; + if (ssize_t(client->stream_pos) > stream->last_suitable_starting_point) { + // We haven't received a keyframe since this stream started waiting, + // so keep on waiting for one. + // This is postcondition #3. + stream->put_client_to_sleep(client); + return; + } + client->stream_pos = stream->last_suitable_starting_point; + client->state = Client::SENDING_DATA; + // Fall through. + } case Client::SENDING_DATA: { skip_lost_data(client); Stream *stream = client->stream; diff --git a/server.h b/server.h index 963aca9..f0eb622 100644 --- a/server.h +++ b/server.h @@ -47,7 +47,7 @@ public: // and the order between them are undefined. // XXX: header should ideally be ordered with respect to data. void add_client_deferred(int sock); - void add_data_deferred(int stream_index, const char *data, size_t bytes); + void add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); // These should not be called while running, since that would violate // threading assumptions (ie., that epoll is only called from one thread diff --git a/serverpool.cpp b/serverpool.cpp index be47aa9..936bd21 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -147,7 +147,7 @@ void ServerPool::set_header(int stream_index, const string &http_header, const s } } -void ServerPool::add_data(int stream_index, const char *data, size_t bytes) +void ServerPool::add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size())); @@ -159,7 +159,7 @@ void ServerPool::add_data(int stream_index, const char *data, size_t bytes) // HTTP stream. for (int i = 0; i < num_servers; ++i) { - servers[i].add_data_deferred(stream_index, data, bytes); + servers[i].add_data_deferred(stream_index, data, bytes, suitable_for_stream_start); } } diff --git a/serverpool.h b/serverpool.h index 9a8e171..13211f9 100644 --- a/serverpool.h +++ b/serverpool.h @@ -41,7 +41,7 @@ public: void set_header(int stream_index, const std::string &http_header, const std::string &stream_header); - void add_data(int stream_index, const char *data, size_t bytes); + void add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); // Connects the given stream to the given mark pool for all the servers. void set_mark_pool(int stream_index, MarkPool *mark_pool); diff --git a/state.proto b/state.proto index 1776280..93c56e3 100644 --- a/state.proto +++ b/state.proto @@ -21,6 +21,7 @@ message StreamProto { repeated int32 data_fds = 8; optional int64 backlog_size = 5 [default=1048576]; optional int64 bytes_received = 3; + optional int64 last_suitable_starting_point = 9; optional string url = 4; // Older versions stored the HTTP and video headers together in this field. diff --git a/stream.cpp b/stream.cpp index e8d65db..6563b5b 100644 --- a/stream.cpp +++ b/stream.cpp @@ -22,7 +22,9 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) data_fd(make_tempfile("")), backlog_size(backlog_size), bytes_received(0), - mark_pool(NULL) + last_suitable_starting_point(-1), + mark_pool(NULL), + queued_data_last_starting_point(-1) { if (data_fd == -1) { exit(1); @@ -44,7 +46,8 @@ Stream::Stream(const StreamProto &serialized, int data_fd) data_fd(data_fd), backlog_size(serialized.backlog_size()), bytes_received(serialized.bytes_received()), - mark_pool(NULL) + mark_pool(NULL), + queued_data_last_starting_point(-1) { if (data_fd == -1) { exit(1); @@ -62,6 +65,13 @@ Stream::Stream(const StreamProto &serialized, int data_fd) stream_header = header.substr(split, string::npos); } } + + // Older versions did not set last_suitable_starting_point. + if (serialized.has_last_suitable_starting_point()) { + last_suitable_starting_point = serialized.last_suitable_starting_point(); + } else { + last_suitable_starting_point = bytes_received; + } } StreamProto Stream::serialize() @@ -72,6 +82,7 @@ StreamProto Stream::serialize() serialized.add_data_fds(data_fd); serialized.set_backlog_size(backlog_size); serialized.set_bytes_received(bytes_received); + serialized.set_last_suitable_starting_point(last_suitable_starting_point); serialized.set_url(url); data_fd = -1; return serialized; @@ -198,14 +209,23 @@ void Stream::add_data_raw(const vector &orig_data) } } -void Stream::add_data_deferred(const char *data, size_t bytes) +void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { + assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START || + suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START); + if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) { + queued_data_last_starting_point = queued_data.size(); + } + if (encoding == Stream::STREAM_ENCODING_METACUBE) { // Add a Metacube block header before the data. metacube_block_header hdr; memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); hdr.size = htonl(bytes); hdr.flags = htonl(0); + if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) { + hdr.flags |= htonl(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START); + } iovec iov; iov.iov_base = new char[bytes + sizeof(hdr)]; @@ -234,12 +254,23 @@ void Stream::process_queued_data() return; } + // Update the last suitable starting point for the stream, + // if the queued data contains such a starting point. + assert(queued_data_last_starting_point < ssize_t(queued_data.size())); + if (queued_data_last_starting_point >= 0) { + last_suitable_starting_point = bytes_received; + for (int i = 0; i < queued_data_last_starting_point; ++i) { + last_suitable_starting_point += queued_data[i].iov_len; + } + } + add_data_raw(queued_data); for (size_t i = 0; i < queued_data.size(); ++i) { char *data = reinterpret_cast(queued_data[i].iov_base); delete[] data; } queued_data.clear(); + queued_data_last_starting_point = -1; // We have more data, so wake up all clients. if (to_process.empty()) { diff --git a/stream.h b/stream.h index d762869..f2ba5d4 100644 --- a/stream.h +++ b/stream.h @@ -15,6 +15,11 @@ class MarkPool; class StreamProto; struct Client; +enum StreamStartSuitability { + NOT_SUITABLE_FOR_STREAM_START, + SUITABLE_FOR_STREAM_START, +}; + struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; @@ -57,6 +62,10 @@ struct Stream { // How many bytes this stream have received. Can very well be larger // than , since the buffer wraps. size_t bytes_received; + + // The last point in the stream that is suitable to start new clients at + // (after having sent the header). -1 if no such point exists yet. + ssize_t last_suitable_starting_point; // Clients that are in SENDING_DATA, but that we don't listen on, // because we currently don't have any data for them. @@ -74,6 +83,10 @@ struct Stream { // The data pointers in the iovec are owned by us. std::vector queued_data; + // Index of the last element in queued_data that is suitable to start streaming at. + // -1 if none. + int queued_data_last_starting_point; + // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put // in the list of clients to wake up when we do. @@ -82,7 +95,7 @@ struct Stream { // Add more data to , adding Metacube headers if needed. // Does not take ownership of . // You should hold the owning Server's . - void add_data_deferred(const char *data, size_t bytes); + void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); // Add queued data to the stream, if any. // You should hold the owning Server's _and_ . diff --git a/udpinput.cpp b/udpinput.cpp index 3e4b9b7..048866a 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -106,7 +106,7 @@ void UDPInput::do_work() } for (size_t i = 0; i < stream_indices.size(); ++i) { - servers->add_data(stream_indices[i], packet_buf, ret); + servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START); } } } -- 2.39.2