std::string remote_addr;
time_t connect_time;
- enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME };
+ enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR, WAITING_FOR_KEYFRAME, PREBUFFERING };
State state;
// The HTTP request, as sent by the client. If we are in READING_REQUEST,
// -1 means we want to send from the end of the backlog (the normal case),
// although only at a keyframe.
// -2 means we want to send from the _beginning_ of the backlog.
- // Once we go into WAITING_FOR_KEYFRAME or SENDING_DATA, these negative
- // values will be translated to real numbers.
+ // Once we go into WAITING_FOR_KEYFRAME, PREBUFFERING or SENDING_DATA,
+ // these negative values will be translated to real numbers.
size_t stream_pos;
// Number of bytes we've sent of data. Only relevant for SENDING_DATA.
stream.backlog_size = atoi(backlog_it->second.c_str());
}
+ map<string, string>::const_iterator prebuffer_it = line.parameters.find("force_prebuffer");
+ if (prebuffer_it == line.parameters.end()) {
+ stream.prebuffering_bytes = 0;
+ } else {
+ stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str());
+ }
+
// Parse encoding.
map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
if (encoding_parm_it == line.parameters.end() ||
std::string url; // As seen by the client.
std::string src; // Can be empty.
size_t backlog_size;
+ size_t prebuffering_bytes;
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
#
# now the streams!
#
-stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv
+stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_prebuffer=1500000
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
stream /udp.ts src=udp://@:1234 backlog_size=1048576
stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000
has_metacube_header = false;
continue;
}
- if (size > 262144) {
+ if (size > 1048576) {
log(WARNING, "[%s] Metacube block of %d bytes (flags=%x); corrupted header?",
url.c_str(), size, flags);
}
if (deserialized_urls.count(stream_config.url) == 0) {
stream_index = servers->add_stream(stream_config.url,
stream_config.backlog_size,
+ stream_config.prebuffering_bytes,
Stream::Encoding(stream_config.encoding));
} else {
stream_index = servers->lookup_stream_by_url(stream_config.url);
}
if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+ client_ptr->state == Client::PREBUFFERING ||
(client_ptr->state == Client::SENDING_DATA &&
client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
client_ptr->stream->put_client_to_sleep(client_ptr);
return url_it->second;
}
-int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
url_map.insert(make_pair(url, streams.size()));
- streams.push_back(new Stream(url, backlog_size, encoding));
+ streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
return streams.size() - 1;
}
return;
}
client->stream_pos = stream->last_suitable_starting_point;
+ client->state = Client::PREBUFFERING;
+ // Fall through.
+ }
+ case Client::PREBUFFERING: {
+ Stream *stream = client->stream;
+ size_t bytes_to_send = stream->bytes_received - client->stream_pos;
+ assert(bytes_to_send <= stream->backlog_size);
+ if (bytes_to_send < stream->prebuffering_bytes) {
+ // We don't have enough bytes buffered to start this client yet.
+ stream->put_client_to_sleep(client);
+ return;
+ }
client->state = Client::SENDING_DATA;
// Fall through.
}
// at the same time).
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
- int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
int lookup_stream_by_url(const std::string &url) const;
void set_backlog_size(int stream_index, size_t new_size);
return servers[0].lookup_stream_by_url(url);
}
-int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
{
// Adding more HTTP streams after UDP streams would cause the UDP stream
// indices to move around, which is obviously not good.
assert(udp_streams.empty());
for (int i = 0; i < num_servers; ++i) {
- int stream_index = servers[i].add_stream(url, backlog_size, encoding);
+ int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding);
assert(stream_index == num_http_streams);
}
return num_http_streams++;
void add_client_from_serialized(const ClientProto &client);
// Adds the given stream to all the servers. Returns the stream index.
- int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
optional bytes stream_header = 7;
repeated int32 data_fds = 8;
optional int64 backlog_size = 5 [default=10485760];
+ optional int64 prebuffering_bytes = 10 [default=0];
optional int64 bytes_received = 3;
optional int64 last_suitable_starting_point = 9;
optional string url = 4;
using namespace std;
-Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding)
: url(url),
encoding(encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
+ prebuffering_bytes(prebuffering_bytes),
bytes_received(0),
last_suitable_starting_point(-1),
pacing_rate(~0U),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
+ prebuffering_bytes(serialized.prebuffering_bytes()),
bytes_received(serialized.bytes_received()),
pacing_rate(~0U),
queued_data_last_starting_point(-1)
serialized.set_stream_header(stream_header);
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
+ serialized.set_prebuffering_bytes(prebuffering_bytes);
serialized.set_bytes_received(bytes_received);
serialized.set_last_suitable_starting_point(last_suitable_starting_point);
serialized.set_url(url);
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
- Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding);
+ Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding);
~Stream();
// Serialization/deserialization.
// How many bytes <data_fd> can hold (the buffer size).
size_t backlog_size;
+ // How many bytes we need to have in the backlog before we start
+ // sending (in practice, we will then send all of them at once,
+ // and then start sending at the normal rate thereafter).
+ // This is basically to force a buffer on the client, which can help
+ // if the client expects us to be able to fill up the buffer much
+ // faster than realtime (ie., it expects a static file).
+ size_t prebuffering_bytes;
+
// How many bytes this stream have received. Can very well be larger
// than <backlog_size>, since the buffer wraps.
size_t bytes_received;