SO_MAX_PACING_RATE is the newfangled socket option from Eric Dumazet,
used with the new fq packet scheduler in Linux. It allows you to set
a max rate for the socket (presumably a stricter upper bound than the
RTT-based estimate from the kernel), delivering pacing without having
to resort to the relatively complex mark setup. It seems to enter
the Linux kernel in 3.13 at the earliest; not unlikely even later.
In time, fwmark will be deprecated, but the implementation of TCP pacing in
Linux is still a bit shaky (especially with not-always-filling applications
like streaming), so fwmark will stay the primary solution for now.
15 files changed:
(unless you delete the stream they are watching, of course).
Cubemap also survives the encoder dying and reconnecting.
- Per-stream fwmark support, for TCP pacing through tc (separate config needed).
(unless you delete the stream they are watching, of course).
Cubemap also survives the encoder dying and reconnecting.
- Per-stream fwmark support, for TCP pacing through tc (separate config needed).
+ - Support for setting max pacing rate through the fq packet scheduler
+ (obsoletes the previous point, but depends on experimental kernel patches
+ that will hit Linux in 3.13 at the earliest)
- Reflects anything VLC can reflect over HTTP, even the muxes VLC
has problems reflecting itself (in particular, FLV).
- IPv4 support. Yes, Cubemap even supports (some) legacy protocols.
- Reflects anything VLC can reflect over HTTP, even the muxes VLC
has problems reflecting itself (in particular, FLV).
- IPv4 support. Yes, Cubemap even supports (some) legacy protocols.
#include "state.pb.h"
#include "stream.h"
#include "state.pb.h"
#include "stream.h"
+#ifndef SO_MAX_PACING_RATE
+#define SO_MAX_PACING_RATE 47
+#endif
+
using namespace std;
Client::Client(int sock)
using namespace std;
Client::Client(int sock)
+ if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &stream->pacing_rate, sizeof(stream->pacing_rate)) == -1) {
+ if (stream->pacing_rate != ~0U) {
+ log_perror("setsockopt(SO_MAX_PACING_RATE)");
+ }
+ }
}
ClientProto Client::serialize() const
}
ClientProto Client::serialize() const
stream.mark_pool = allocate_mark_pool(from, to, config);
}
stream.mark_pool = allocate_mark_pool(from, to, config);
}
+ // Parse the pacing rate, converting from kilobits to bytes as needed.
+ map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
+ if (pacing_rate_it == line.parameters.end()) {
+ stream.pacing_rate = ~0U;
+ } else {
+ stream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8;
+ }
+
config->streams.push_back(stream);
return true;
}
config->streams.push_back(stream);
return true;
}
udpstream.mark_pool = allocate_mark_pool(from, to, config);
}
udpstream.mark_pool = allocate_mark_pool(from, to, config);
}
+ // Parse the pacing rate, converting from kilobits to bytes as needed.
+ map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
+ if (pacing_rate_it == line.parameters.end()) {
+ udpstream.pacing_rate = ~0U;
+ } else {
+ udpstream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8;
+ }
+
config->udpstreams.push_back(udpstream);
return true;
}
config->udpstreams.push_back(udpstream);
return true;
}
bool parse_error_log(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 0) {
bool parse_error_log(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 0) {
std::string src; // Can be empty.
size_t backlog_size;
int mark_pool; // -1 for none.
std::string src; // Can be empty.
size_t backlog_size;
int mark_pool; // -1 for none.
+ uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
sockaddr_in6 dst;
std::string src; // Can be empty.
int mark_pool; // -1 for none.
sockaddr_in6 dst;
std::string src; // Can be empty.
int mark_pool; // -1 for none.
+ uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
};
struct AcceptorConfig {
};
struct AcceptorConfig {
.IP \[bu]
Per-stream fwmark support, for TCP pacing through tc (separate config needed).
.IP \[bu]
.IP \[bu]
Per-stream fwmark support, for TCP pacing through tc (separate config needed).
.IP \[bu]
+Support for setting max pacing rate through the fq packet scheduler
+(obsoletes the previous point, but depends on experimental kernel patches
+that will hit Linux in 3.13 at the earliest)
+.IP \[bu]
Reflects anything VLC can reflect over HTTP, even the muxes VLC
has problems reflecting itself (in particular, FLV).
.IP \[bu]
Reflects anything VLC can reflect over HTTP, even the muxes VLC
has problems reflecting itself (in particular, FLV).
.IP \[bu]
#
stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
#
stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
-stream /udp.ts src=udp://@:1234 backlog_size=1048576
+stream /udp.ts src=udp://@:1234 backlog_size=1048576 pacing_rate_kbit=2000
udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
}
servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
}
+ servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
+
string src = stream_config.src;
if (!src.empty()) {
multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
string src = stream_config.src;
if (!src.empty()) {
multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
if (udpstream_config.mark_pool != -1) {
mark_pool = mark_pools[udpstream_config.mark_pool];
}
if (udpstream_config.mark_pool != -1) {
mark_pool = mark_pools[udpstream_config.mark_pool];
}
- int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool);
+ int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate);
string src = udpstream_config.src;
if (!src.empty()) {
string src = udpstream_config.src;
if (!src.empty()) {
#include "stream.h"
#include "util.h"
#include "stream.h"
#include "util.h"
+#ifndef SO_MAX_PACING_RATE
+#define SO_MAX_PACING_RATE 47
+#endif
+
using namespace std;
extern AccessLogThread *access_log;
using namespace std;
extern AccessLogThread *access_log;
streams[stream_index]->mark_pool = mark_pool;
}
streams[stream_index]->mark_pool = mark_pool;
}
+void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
+{
+ MutexLock lock(&mutex);
+ assert(clients.empty());
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->pacing_rate = pacing_rate;
+}
+
void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
log_perror("setsockopt(SO_MARK)");
}
}
log_perror("setsockopt(SO_MARK)");
}
}
+ if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
+ if (client->stream->pacing_rate != 0) {
+ log_perror("setsockopt(SO_MAX_PACING_RATE)");
+ }
+ }
client->request.clear();
return 200; // OK!
client->request.clear();
return 200; // OK!
// NOTE: This should be set before any clients are connected!
void set_mark_pool(int stream_index, MarkPool *mark_pool);
// NOTE: This should be set before any clients are connected!
void set_mark_pool(int stream_index, MarkPool *mark_pool);
+ // Set that the given stream should use the given max pacing rate from now on.
+ // NOTE: This should be set before any clients are connected!
+ void set_pacing_rate(int stream_index, uint32_t pacing_rate);
+
// These will be deferred until the next time an iteration in do_work() happens,
// and the order between them are undefined.
// XXX: header should ideally be ordered with respect to data.
// These will be deferred until the next time an iteration in do_work() happens,
// and the order between them are undefined.
// XXX: header should ideally be ordered with respect to data.
return num_http_streams++;
}
return num_http_streams++;
}
-int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate)
- udp_streams.push_back(new UDPStream(dst, mark_pool));
+ udp_streams.push_back(new UDPStream(dst, mark_pool, pacing_rate));
return num_http_streams + udp_streams.size() - 1;
}
return num_http_streams + udp_streams.size() - 1;
}
{
for (int i = 0; i < num_servers; ++i) {
servers[i].set_mark_pool(stream_index, mark_pool);
{
for (int i = 0; i < num_servers; ++i) {
servers[i].set_mark_pool(stream_index, mark_pool);
+ }
+}
+
+void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_pacing_rate(stream_index, pacing_rate);
int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
- int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool);
+ int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate);
// Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
int lookup_stream_by_url(const std::string &url) const;
// Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
int lookup_stream_by_url(const std::string &url) const;
// Connects the given stream to the given mark pool for all the servers.
void set_mark_pool(int stream_index, MarkPool *mark_pool);
// Connects the given stream to the given mark pool for all the servers.
void set_mark_pool(int stream_index, MarkPool *mark_pool);
+ // Sets the max pacing rate for all the servers.
+ void set_pacing_rate(int stream_index, uint32_t pacing_rate);
+
// Changes the given stream's backlog size on all the servers.
void set_backlog_size(int stream_index, size_t new_size);
// Changes the given stream's backlog size on all the servers.
void set_backlog_size(int stream_index, size_t new_size);
bytes_received(0),
last_suitable_starting_point(-1),
mark_pool(NULL),
bytes_received(0),
last_suitable_starting_point(-1),
mark_pool(NULL),
queued_data_last_starting_point(-1)
{
if (data_fd == -1) {
queued_data_last_starting_point(-1)
{
if (data_fd == -1) {
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
mark_pool(NULL),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
mark_pool(NULL),
queued_data_last_starting_point(-1)
{
if (data_fd == -1) {
queued_data_last_starting_point(-1)
{
if (data_fd == -1) {
// What pool to fetch marks from, or NULL.
MarkPool *mark_pool;
// What pool to fetch marks from, or NULL.
MarkPool *mark_pool;
+ // Maximum pacing rate for the stream.
+ int pacing_rate;
+
// Queued data, if any. Protected by <queued_data_mutex>.
// The data pointers in the iovec are owned by us.
std::vector<iovec> queued_data;
// Queued data, if any. Protected by <queued_data_mutex>.
// The data pointers in the iovec are owned by us.
std::vector<iovec> queued_data;
#include "udpstream.h"
#include "util.h"
#include "udpstream.h"
#include "util.h"
-UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+#ifndef SO_MAX_PACING_RATE
+#define SO_MAX_PACING_RATE 47
+#endif
+
+UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate)
: dst(dst),
mark_pool(mark_pool),
: dst(dst),
mark_pool(mark_pool),
+ fwmark(0),
+ pacing_rate(pacing_rate)
{
sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
if (sock == -1) {
{
sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
if (sock == -1) {
+ if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &pacing_rate, sizeof(pacing_rate)) == -1) {
+ if (pacing_rate != 0) {
+ log_perror("setsockopt(SO_MAX_PACING_RATE)");
+ }
+ }
}
UDPStream::~UDPStream()
}
UDPStream::~UDPStream()
class UDPStream {
public:
// <mark_pool> can be NULL. Does not take ownership of the mark pool.
class UDPStream {
public:
// <mark_pool> can be NULL. Does not take ownership of the mark pool.
- UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool);
+ UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate);
~UDPStream();
void send(const char *data, size_t bytes);
~UDPStream();
void send(const char *data, size_t bytes);
int sock;
MarkPool *mark_pool;
int fwmark;
int sock;
MarkPool *mark_pool;
int fwmark;
};
#endif // !defined(_UDPSTREAM_H)
};
#endif // !defined(_UDPSTREAM_H)