summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
102adb2)
The motivation is jwPlayer, which for HTTP files expects to be able
to do no prebuffering and just download full speed nevertheless
(as it assumes they are static files, not streams) -- when it cannot,
it shows an ugly icon on top of the stream all the time. So we add
an option for forced prebuffering (three seconds seems to be about
fine) which means we wait sending until we have a pretty big backlog.
Ideally, we would be able to actually send old data instead of just
waiting (which would mean that the client doesn't need the extra wait
at the beginning), but it's complicated with having to remember old
keyframe positions, changed stream headers etc.
12 files changed:
std::string remote_addr;
time_t connect_time;
std::string remote_addr;
time_t connect_time;
- enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME };
+ enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME, PREBUFFERING };
State state;
// The HTTP request, as sent by the client. If we are in READING_REQUEST,
State state;
// The HTTP request, as sent by the client. If we are in READING_REQUEST,
// -1 means we want to send from the end of the backlog (the normal case),
// although only at a keyframe.
// -2 means we want to send from the _beginning_ of the backlog.
// -1 means we want to send from the end of the backlog (the normal case),
// although only at a keyframe.
// -2 means we want to send from the _beginning_ of the backlog.
- // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative
- // values will be translated to real numbers.
+ // Once we go into WAITING_FOR_KEYFRAME, PREBUFFERING or SENDING_DATA,
+ // these negative values will be translated to real numbers.
size_t stream_pos;
// Number of bytes we've sent of data. Only relevant for SENDING_DATA.
size_t stream_pos;
// Number of bytes we've sent of data. Only relevant for SENDING_DATA.
stream.backlog_size = atoi(backlog_it->second.c_str());
}
stream.backlog_size = atoi(backlog_it->second.c_str());
}
+ map<string, string>::const_iterator prebuffer_it = line.parameters.find("force_prebuffer");
+ if (prebuffer_it == line.parameters.end()) {
+ stream.prebuffering_bytes = 0;
+ } else {
+ stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str());
+ }
+
// Parse encoding.
map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
if (encoding_parm_it == line.parameters.end() ||
// Parse encoding.
map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
if (encoding_parm_it == line.parameters.end() ||
std::string url; // As seen by the client.
std::string src; // Can be empty.
size_t backlog_size;
std::string url; // As seen by the client.
std::string src; // Can be empty.
size_t backlog_size;
+ size_t prebuffering_bytes;
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
-stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv
+stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_prebuffer=1500000
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
stream /udp.ts src=udp://@:1234 backlog_size=1048576
stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
stream /udp.ts src=udp://@:1234 backlog_size=1048576
stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000
if (deserialized_urls.count(stream_config.url) == 0) {
stream_index = servers->add_stream(stream_config.url,
stream_config.backlog_size,
if (deserialized_urls.count(stream_config.url) == 0) {
stream_index = servers->add_stream(stream_config.url,
stream_config.backlog_size,
+ stream_config.prebuffering_bytes,
Stream::Encoding(stream_config.encoding));
} else {
stream_index = servers->lookup_stream_by_url(stream_config.url);
Stream::Encoding(stream_config.encoding));
} else {
stream_index = servers->lookup_stream_by_url(stream_config.url);
}
if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
}
if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+ client_ptr->state == Client::PREBUFFERING ||
(client_ptr->state == Client::SENDING_DATA &&
client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
client_ptr->stream->put_client_to_sleep(client_ptr);
(client_ptr->state == Client::SENDING_DATA &&
client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
client_ptr->stream->put_client_to_sleep(client_ptr);
-int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
url_map.insert(make_pair(url, streams.size()));
{
MutexLock lock(&mutex);
url_map.insert(make_pair(url, streams.size()));
- streams.push_back(new Stream(url, backlog_size, encoding));
+ streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
return streams.size() - 1;
}
return streams.size() - 1;
}
return;
}
client->stream_pos = stream->last_suitable_starting_point;
return;
}
client->stream_pos = stream->last_suitable_starting_point;
+ client->state = Client::PREBUFFERING;
+ // Fall through.
+ }
+ case Client::PREBUFFERING: {
+ Stream *stream = client->stream;
+ size_t bytes_to_send = stream->bytes_received - client->stream_pos;
+ assert(bytes_to_send <= stream->backlog_size);
+ if (bytes_to_send < stream->prebuffering_bytes) {
+ // We don't have enough bytes buffered to start this client yet.
+ stream->put_client_to_sleep(client);
+ return;
+ }
client->state = Client::SENDING_DATA;
// Fall through.
}
client->state = Client::SENDING_DATA;
// Fall through.
}
// at the same time).
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
// at the same time).
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
- int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
int lookup_stream_by_url(const std::string &url) const;
void set_backlog_size(int stream_index, size_t new_size);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
int lookup_stream_by_url(const std::string &url) const;
void set_backlog_size(int stream_index, size_t new_size);
return servers[0].lookup_stream_by_url(url);
}
return servers[0].lookup_stream_by_url(url);
}
-int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
{
// Adding more HTTP streams after UDP streams would cause the UDP stream
// indices to move around, which is obviously not good.
assert(udp_streams.empty());
for (int i = 0; i < num_servers; ++i) {
{
// Adding more HTTP streams after UDP streams would cause the UDP stream
// indices to move around, which is obviously not good.
assert(udp_streams.empty());
for (int i = 0; i < num_servers; ++i) {
- int stream_index = servers[i].add_stream(url, backlog_size, encoding);
+ int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding);
assert(stream_index == num_http_streams);
}
return num_http_streams++;
assert(stream_index == num_http_streams);
}
return num_http_streams++;
void add_client_from_serialized(const ClientProto &client);
// Adds the given stream to all the servers. Returns the stream index.
void add_client_from_serialized(const ClientProto &client);
// Adds the given stream to all the servers. Returns the stream index.
- int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
optional bytes stream_header = 7;
repeated int32 data_fds = 8;
optional int64 backlog_size = 5 [default=10485760];
optional bytes stream_header = 7;
repeated int32 data_fds = 8;
optional int64 backlog_size = 5 [default=10485760];
+ optional int64 prebuffering_bytes = 10 [default=0];
optional int64 bytes_received = 3;
optional int64 last_suitable_starting_point = 9;
optional string url = 4;
optional int64 bytes_received = 3;
optional int64 last_suitable_starting_point = 9;
optional string url = 4;
-Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding)
: url(url),
encoding(encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
: url(url),
encoding(encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
+ prebuffering_bytes(prebuffering_bytes),
bytes_received(0),
last_suitable_starting_point(-1),
pacing_rate(~0U),
bytes_received(0),
last_suitable_starting_point(-1),
pacing_rate(~0U),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
+ prebuffering_bytes(serialized.prebuffering_bytes()),
bytes_received(serialized.bytes_received()),
pacing_rate(~0U),
queued_data_last_starting_point(-1)
bytes_received(serialized.bytes_received()),
pacing_rate(~0U),
queued_data_last_starting_point(-1)
serialized.set_stream_header(stream_header);
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_stream_header(stream_header);
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
+ serialized.set_prebuffering_bytes(prebuffering_bytes);
serialized.set_bytes_received(bytes_received);
serialized.set_last_suitable_starting_point(last_suitable_starting_point);
serialized.set_url(url);
serialized.set_bytes_received(bytes_received);
serialized.set_last_suitable_starting_point(last_suitable_starting_point);
serialized.set_url(url);
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
- Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding);
+ Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding);
~Stream();
// Serialization/deserialization.
~Stream();
// Serialization/deserialization.
// How many bytes <data_fd> can hold (the buffer size).
size_t backlog_size;
// How many bytes <data_fd> can hold (the buffer size).
size_t backlog_size;
+ // How many bytes we need to have in the backlog before we start
+ // sending (in practice, we will then send all of them at once,
+ // and then start sending at the normal rate thereafter).
+ // This is basically to force a buffer on the client, which can help
+ // if the client expects us to be able to fill up the buffer much
+ // faster than realtime (ie., it expects a static file).
+ size_t prebuffering_bytes;
+
// How many bytes this stream have received. Can very well be larger
// than <backlog_size>, since the buffer wraps.
size_t bytes_received;
// How many bytes this stream have received. Can very well be larger
// than <backlog_size>, since the buffer wraps.
size_t bytes_received;