std::string remote_addr;
time_t connect_time;
- enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR };
+ enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME };
State state;
// The HTTP request, as sent by the client. If we are in READING_REQUEST,
// Number of bytes we are into the stream (ie., the end of last send).
// -1 means we want to send from the end of the backlog (the normal case),
+ // although only at a keyframe.
// -2 means we want to send from the _beginning_ of the backlog.
- // Once we go into SENDING_DATA, these negative values will be translated
- // to real numbers.
+ // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative
+ // values will be translated to real numbers.
size_t stream_pos;
// Number of bytes we've sent of data. Only relevant for SENDING_DATA.
for (size_t i = 0; i < stream_indices.size(); ++i) {
servers->set_header(stream_indices[i], http_header, header);
}
- } else {
+ } else {
+ StreamStartSuitability suitable_for_stream_start;
+ if (flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) {
+ suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
+ } else {
+ suitable_for_stream_start = SUITABLE_FOR_STREAM_START;
+ }
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], inner_data, size);
+ servers->add_data(stream_indices[i], inner_data, size, suitable_for_stream_start);
}
}
#define METACUBE_SYNC "\\o/_metacube_\\o/" /* 16 bytes long. */
#define METACUBE_FLAGS_HEADER 0x1
+#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2
struct metacube_block_header {
char sync[16]; /* METACUBE_SYNC */
exit(1);
}
- if (client_ptr->state == Client::SENDING_DATA &&
- client_ptr->stream_pos == client_ptr->stream->bytes_received) {
+ if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+ (client_ptr->state == Client::SENDING_DATA &&
+ client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
client_ptr->stream->put_client_to_sleep(client_ptr);
} else {
process_client(client_ptr);
streams[stream_index]->mark_pool = mark_pool;
}
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
{
MutexLock lock(&queued_data_mutex);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->add_data_deferred(data, bytes);
+ streams[stream_index]->add_data_deferred(data, bytes, suitable_for_stream_start);
}
// See the .h file for postconditions after this function.
return;
}
- // Start sending from the end. In other words, we won't send any of the backlog,
- // but we'll start sending immediately as we get data.
+ // Start sending from the first keyframe we get. In other
+ // words, we won't send any of the backlog, but we'll start
+ // sending immediately as we get the next keyframe block.
// This is postcondition #3.
- client->state = Client::SENDING_DATA;
if (client->stream_pos == size_t(-2)) {
client->stream_pos = std::min<size_t>(
client->stream->bytes_received - client->stream->backlog_size,
0);
+ client->state = Client::SENDING_DATA;
} else {
// client->stream_pos should be -1, but it might not be,
// if we have clients from an older version.
client->stream_pos = client->stream->bytes_received;
+ client->state = Client::WAITING_FOR_KEYFRAME;
}
client->stream->put_client_to_sleep(client);
return;
}
+ case Client::WAITING_FOR_KEYFRAME: {
+ Stream *stream = client->stream;
+ if (ssize_t(client->stream_pos) > stream->last_suitable_starting_point) {
+ // We haven't received a keyframe since this stream started waiting,
+ // so keep on waiting for one.
+ // This is postcondition #3.
+ stream->put_client_to_sleep(client);
+ return;
+ }
+ client->stream_pos = stream->last_suitable_starting_point;
+ client->state = Client::SENDING_DATA;
+ // Fall through.
+ }
case Client::SENDING_DATA: {
skip_lost_data(client);
Stream *stream = client->stream;
// and the order between them are undefined.
// XXX: header should ideally be ordered with respect to data.
void add_client_deferred(int sock);
- void add_data_deferred(int stream_index, const char *data, size_t bytes);
+ void add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
// These should not be called while running, since that would violate
// threading assumptions (ie., that epoll is only called from one thread
}
}
-void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
{
assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
// HTTP stream.
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_data_deferred(stream_index, data, bytes);
+ servers[i].add_data_deferred(stream_index, data, bytes, suitable_for_stream_start);
}
}
void set_header(int stream_index,
const std::string &http_header,
const std::string &stream_header);
- void add_data(int stream_index, const char *data, size_t bytes);
+ void add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
// Connects the given stream to the given mark pool for all the servers.
void set_mark_pool(int stream_index, MarkPool *mark_pool);
repeated int32 data_fds = 8;
optional int64 backlog_size = 5 [default=1048576];
optional int64 bytes_received = 3;
+ optional int64 last_suitable_starting_point = 9;
optional string url = 4;
// Older versions stored the HTTP and video headers together in this field.
data_fd(make_tempfile("")),
backlog_size(backlog_size),
bytes_received(0),
- mark_pool(NULL)
+ last_suitable_starting_point(-1),
+ mark_pool(NULL),
+ queued_data_last_starting_point(-1)
{
if (data_fd == -1) {
exit(1);
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
- mark_pool(NULL)
+ mark_pool(NULL),
+ queued_data_last_starting_point(-1)
{
if (data_fd == -1) {
exit(1);
stream_header = header.substr(split, string::npos);
}
}
+
+ // Older versions did not set last_suitable_starting_point.
+ if (serialized.has_last_suitable_starting_point()) {
+ last_suitable_starting_point = serialized.last_suitable_starting_point();
+ } else {
+ last_suitable_starting_point = bytes_received;
+ }
}
StreamProto Stream::serialize()
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_bytes_received(bytes_received);
+ serialized.set_last_suitable_starting_point(last_suitable_starting_point);
serialized.set_url(url);
data_fd = -1;
return serialized;
}
}
-void Stream::add_data_deferred(const char *data, size_t bytes)
+void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
{
+ assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
+ suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
+ if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
+ queued_data_last_starting_point = queued_data.size();
+ }
+
if (encoding == Stream::STREAM_ENCODING_METACUBE) {
// Add a Metacube block header before the data.
metacube_block_header hdr;
memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
hdr.size = htonl(bytes);
hdr.flags = htonl(0);
+ if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
+ hdr.flags |= htonl(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
+ }
iovec iov;
iov.iov_base = new char[bytes + sizeof(hdr)];
return;
}
+ // Update the last suitable starting point for the stream,
+ // if the queued data contains such a starting point.
+ assert(queued_data_last_starting_point < ssize_t(queued_data.size()));
+ if (queued_data_last_starting_point >= 0) {
+ last_suitable_starting_point = bytes_received;
+ for (int i = 0; i < queued_data_last_starting_point; ++i) {
+ last_suitable_starting_point += queued_data[i].iov_len;
+ }
+ }
+
add_data_raw(queued_data);
for (size_t i = 0; i < queued_data.size(); ++i) {
char *data = reinterpret_cast<char *>(queued_data[i].iov_base);
delete[] data;
}
queued_data.clear();
+ queued_data_last_starting_point = -1;
// We have more data, so wake up all clients.
if (to_process.empty()) {
class StreamProto;
struct Client;
+enum StreamStartSuitability {
+ NOT_SUITABLE_FOR_STREAM_START,
+ SUITABLE_FOR_STREAM_START,
+};
+
struct Stream {
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
// How many bytes this stream have received. Can very well be larger
// than <backlog_size>, since the buffer wraps.
size_t bytes_received;
+
+ // The last point in the stream that is suitable to start new clients at
+ // (after having sent the header). -1 if no such point exists yet.
+ ssize_t last_suitable_starting_point;
// Clients that are in SENDING_DATA, but that we don't listen on,
// because we currently don't have any data for them.
// The data pointers in the iovec are owned by us.
std::vector<iovec> queued_data;
+ // Index of the last element in queued_data that is suitable to start streaming at.
+ // -1 if none.
+ int queued_data_last_starting_point;
+
// Put client to sleep, since there is no more data for it; we will on
// longer listen on POLLOUT until we get more data. Also, it will be put
// in the list of clients to wake up when we do.
// Add more data to <queued_data>, adding Metacube headers if needed.
// Does not take ownership of <data>.
// You should hold the owning Server's <queued_data_mutex>.
- void add_data_deferred(const char *data, size_t bytes);
+ void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
// Add queued data to the stream, if any.
// You should hold the owning Server's <mutex> _and_ <queued_data_mutex>.
}
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], packet_buf, ret);
+ servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
}
}
}