From: Steinar H. Gunderson Date: Sat, 28 Sep 2013 22:37:43 +0000 (+0200) Subject: Support SO_MAX_PACING_RATE. X-Git-Tag: 1.0.2~13 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=ce0d42a14da5686366d0c73fa9b152e90619ff91 Support SO_MAX_PACING_RATE. SO_MAX_PACING_RATE is the newfangled socket option from Eric Dumazet, used with the new fq packet scheduler in Linux. It allows you to set a max rate for the socket (presumably a stricter upper bound than the RTT-based estimate from the kernel), delivering pacing without having to resort to the relatively complex mark setup. It seems to enter the Linux kernel in 3.13 at the earliest; not unlikely even later. In time, fwmark will be deprecated, but the implementation of TCP pacing in Linux is still a bit shaky (especially with not-always-filling applications like streaming), so fwmark will stay the primary solution for now. --- diff --git a/README b/README index b965d03..4d5d680 100644 --- a/README +++ b/README @@ -13,6 +13,9 @@ A short list of features: (unless you delete the stream they are watching, of course). Cubemap also survives the encoder dying and reconnecting. - Per-stream fwmark support, for TCP pacing through tc (separate config needed). + - Support for setting max pacing rate through the fq packet scheduler + (obsoletes the previous point, but depends on experimental kernel patches + that will hit Linux in 3.13 at the earliest) - Reflects anything VLC can reflect over HTTP, even the muxes VLC has problems reflecting itself (in particular, FLV). - IPv4 support. Yes, Cubemap even supports (some) legacy protocols. diff --git a/client.cpp b/client.cpp index f6361f3..eea41a5 100644 --- a/client.cpp +++ b/client.cpp @@ -9,6 +9,10 @@ #include "state.pb.h" #include "stream.h" +#ifndef SO_MAX_PACING_RATE +#define SO_MAX_PACING_RATE 47 +#endif + using namespace std; Client::Client(int sock) @@ -80,6 +84,11 @@ Client::Client(const ClientProto &serialized, Stream *stream) } fwmark = 0; } + if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &stream->pacing_rate, sizeof(stream->pacing_rate)) == -1) { + if (stream->pacing_rate != ~0U) { + log_perror("setsockopt(SO_MAX_PACING_RATE)"); + } + } } ClientProto Client::serialize() const diff --git a/config.cpp b/config.cpp index cb16cbc..f5182aa 100644 --- a/config.cpp +++ b/config.cpp @@ -234,6 +234,14 @@ bool parse_stream(const ConfigLine &line, Config *config) stream.mark_pool = allocate_mark_pool(from, to, config); } + // Parse the pacing rate, converting from kilobits to bytes as needed. + map::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit"); + if (pacing_rate_it == line.parameters.end()) { + stream.pacing_rate = ~0U; + } else { + stream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8; + } + config->streams.push_back(stream); return true; } @@ -316,9 +324,18 @@ bool parse_udpstream(const ConfigLine &line, Config *config) udpstream.mark_pool = allocate_mark_pool(from, to, config); } + // Parse the pacing rate, converting from kilobits to bytes as needed. + map::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit"); + if (pacing_rate_it == line.parameters.end()) { + udpstream.pacing_rate = ~0U; + } else { + udpstream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8; + } + config->udpstreams.push_back(udpstream); return true; } + bool parse_error_log(const ConfigLine &line, Config *config) { if (line.arguments.size() != 0) { diff --git a/config.h b/config.h index e7959c5..b84b957 100644 --- a/config.h +++ b/config.h @@ -18,6 +18,7 @@ struct StreamConfig { std::string src; // Can be empty. size_t backlog_size; int mark_pool; // -1 for none. + uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; }; @@ -25,6 +26,7 @@ struct UDPStreamConfig { sockaddr_in6 dst; std::string src; // Can be empty. int mark_pool; // -1 for none. + uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). }; struct AcceptorConfig { diff --git a/cubemap.1 b/cubemap.1 index 9760a89..e7c76e3 100644 --- a/cubemap.1 +++ b/cubemap.1 @@ -29,6 +29,10 @@ Cubemap also survives the encoder dying and reconnecting. .IP \[bu] Per-stream fwmark support, for TCP pacing through tc (separate config needed). .IP \[bu] +Support for setting max pacing rate through the fq packet scheduler +(obsoletes the previous point, but depends on experimental kernel patches +that will hit Linux in 3.13 at the earliest) +.IP \[bu] Reflects anything VLC can reflect over HTTP, even the muxes VLC has problems reflecting itself (in particular, FLV). .IP \[bu] diff --git a/cubemap.config.sample b/cubemap.config.sample index c6ab182..bce0659 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -33,6 +33,6 @@ error_log type=console # stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube -stream /udp.ts src=udp://@:1234 backlog_size=1048576 +stream /udp.ts src=udp://@:1234 backlog_size=1048576 pacing_rate_kbit=2000 udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube diff --git a/main.cpp b/main.cpp index c4c5bd4..86ba28a 100644 --- a/main.cpp +++ b/main.cpp @@ -184,6 +184,8 @@ void create_streams(const Config &config, servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); } + servers->set_pacing_rate(stream_index, stream_config.pacing_rate); + string src = stream_config.src; if (!src.empty()) { multimap::iterator input_it = inputs->find(src); @@ -212,7 +214,7 @@ void create_streams(const Config &config, if (udpstream_config.mark_pool != -1) { mark_pool = mark_pools[udpstream_config.mark_pool]; } - int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool); + int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate); string src = udpstream_config.src; if (!src.empty()) { diff --git a/server.cpp b/server.cpp index 0ed44fe..288224c 100644 --- a/server.cpp +++ b/server.cpp @@ -28,6 +28,10 @@ #include "stream.h" #include "util.h" +#ifndef SO_MAX_PACING_RATE +#define SO_MAX_PACING_RATE 47 +#endif + using namespace std; extern AccessLogThread *access_log; @@ -271,6 +275,14 @@ void Server::set_mark_pool(int stream_index, MarkPool *mark_pool) streams[stream_index]->mark_pool = mark_pool; } +void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) +{ + MutexLock lock(&mutex); + assert(clients.empty()); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->pacing_rate = pacing_rate; +} + void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); @@ -525,6 +537,11 @@ int Server::parse_request(Client *client) log_perror("setsockopt(SO_MARK)"); } } + if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) { + if (client->stream->pacing_rate != 0) { + log_perror("setsockopt(SO_MAX_PACING_RATE)"); + } + } client->request.clear(); return 200; // OK! diff --git a/server.h b/server.h index 4b3aab8..84f822d 100644 --- a/server.h +++ b/server.h @@ -43,6 +43,10 @@ public: // NOTE: This should be set before any clients are connected! void set_mark_pool(int stream_index, MarkPool *mark_pool); + // Set that the given stream should use the given max pacing rate from now on. + // NOTE: This should be set before any clients are connected! + void set_pacing_rate(int stream_index, uint32_t pacing_rate); + // These will be deferred until the next time an iteration in do_work() happens, // and the order between them are undefined. // XXX: header should ideally be ordered with respect to data. diff --git a/serverpool.cpp b/serverpool.cpp index 936bd21..1c99d40 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -121,9 +121,9 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect return num_http_streams++; } -int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool) +int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate) { - udp_streams.push_back(new UDPStream(dst, mark_pool)); + udp_streams.push_back(new UDPStream(dst, mark_pool, pacing_rate)); return num_http_streams + udp_streams.size() - 1; } @@ -191,6 +191,13 @@ void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool) { for (int i = 0; i < num_servers; ++i) { servers[i].set_mark_pool(stream_index, mark_pool); + } +} + +void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate) +{ + for (int i = 0; i < num_servers; ++i) { + servers[i].set_pacing_rate(stream_index, pacing_rate); } } diff --git a/serverpool.h b/serverpool.h index b67fa48..3db9dcc 100644 --- a/serverpool.h +++ b/serverpool.h @@ -33,7 +33,7 @@ public: int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); void delete_stream(const std::string &url); - int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool); + int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate); // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure. int lookup_stream_by_url(const std::string &url) const; @@ -47,6 +47,9 @@ public: // Connects the given stream to the given mark pool for all the servers. void set_mark_pool(int stream_index, MarkPool *mark_pool); + // Sets the max pacing rate for all the servers. + void set_pacing_rate(int stream_index, uint32_t pacing_rate); + // Changes the given stream's backlog size on all the servers. void set_backlog_size(int stream_index, size_t new_size); diff --git a/stream.cpp b/stream.cpp index b7a92c2..6bb86c4 100644 --- a/stream.cpp +++ b/stream.cpp @@ -26,6 +26,7 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) bytes_received(0), last_suitable_starting_point(-1), mark_pool(NULL), + pacing_rate(~0U), queued_data_last_starting_point(-1) { if (data_fd == -1) { @@ -51,6 +52,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd) backlog_size(serialized.backlog_size()), bytes_received(serialized.bytes_received()), mark_pool(NULL), + pacing_rate(~0U), queued_data_last_starting_point(-1) { if (data_fd == -1) { diff --git a/stream.h b/stream.h index 2cdba11..d29b924 100644 --- a/stream.h +++ b/stream.h @@ -84,6 +84,9 @@ struct Stream { // What pool to fetch marks from, or NULL. MarkPool *mark_pool; + // Maximum pacing rate for the stream. + int pacing_rate; + // Queued data, if any. Protected by . // The data pointers in the iovec are owned by us. std::vector queued_data; diff --git a/udpstream.cpp b/udpstream.cpp index 5ed16df..11e1527 100644 --- a/udpstream.cpp +++ b/udpstream.cpp @@ -6,10 +6,15 @@ #include "udpstream.h" #include "util.h" -UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool) +#ifndef SO_MAX_PACING_RATE +#define SO_MAX_PACING_RATE 47 +#endif + +UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate) : dst(dst), mark_pool(mark_pool), - fwmark(0) + fwmark(0), + pacing_rate(pacing_rate) { sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); if (sock == -1) { @@ -26,6 +31,11 @@ UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool) } } } + if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &pacing_rate, sizeof(pacing_rate)) == -1) { + if (pacing_rate != 0) { + log_perror("setsockopt(SO_MAX_PACING_RATE)"); + } + } } UDPStream::~UDPStream() diff --git a/udpstream.h b/udpstream.h index e30c948..4d1e1f7 100644 --- a/udpstream.h +++ b/udpstream.h @@ -17,7 +17,7 @@ class MarkPool; class UDPStream { public: // can be NULL. Does not take ownership of the mark pool. - UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool); + UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate); ~UDPStream(); void send(const char *data, size_t bytes); @@ -29,6 +29,7 @@ private: int sock; MarkPool *mark_pool; int fwmark; + uint32_t pacing_rate; }; #endif // !defined(_UDPSTREAM_H)