Support SO_MAX_PACING_RATE.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 28 Sep 2013 22:37:43 +0000 (00:37 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 28 Sep 2013 22:37:43 +0000 (00:37 +0200)
SO_MAX_PACING_RATE is the newfangled socket option from Eric Dumazet,
used with the new fq packet scheduler in Linux. It allows you to set
a max rate for the socket (presumably a stricter upper bound than the
RTT-based estimate from the kernel), delivering pacing without having
to resort to the relatively complex mark setup. It seems to enter
the Linux kernel in 3.13 at the earliest; not unlikely even later.

In time, fwmark will be deprecated, but the implementation of TCP pacing in
Linux is still a bit shaky (especially with not-always-filling applications
like streaming), so fwmark will stay the primary solution for now.

15 files changed:
README
client.cpp
config.cpp
config.h
cubemap.1
cubemap.config.sample
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
stream.cpp
stream.h
udpstream.cpp
udpstream.h

diff --git a/README b/README
index b965d03..4d5d680 100644 (file)
--- a/README
+++ b/README
@@ -13,6 +13,9 @@ A short list of features:
    (unless you delete the stream they are watching, of course).
    Cubemap also survives the encoder dying and reconnecting.
  - Per-stream fwmark support, for TCP pacing through tc (separate config needed).
+ - Support for setting max pacing rate through the fq packet scheduler
+   (obsoletes the previous point, but depends on experimental kernel patches
+   that will hit Linux in 3.13 at the earliest)
  - Reflects anything VLC can reflect over HTTP, even the muxes VLC
    has problems reflecting itself (in particular, FLV).
  - IPv4 support. Yes, Cubemap even supports (some) legacy protocols.
index f6361f3..eea41a5 100644 (file)
@@ -9,6 +9,10 @@
 #include "state.pb.h"
 #include "stream.h"
 
+#ifndef SO_MAX_PACING_RATE
+#define SO_MAX_PACING_RATE 47
+#endif
+
 using namespace std;
 
 Client::Client(int sock)
@@ -80,6 +84,11 @@ Client::Client(const ClientProto &serialized, Stream *stream)
                }
                fwmark = 0;
        }
+       if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &stream->pacing_rate, sizeof(stream->pacing_rate)) == -1) {
+               if (stream->pacing_rate != ~0U) {
+                       log_perror("setsockopt(SO_MAX_PACING_RATE)");
+               }
+       }
 }
 
 ClientProto Client::serialize() const
index cb16cbc..f5182aa 100644 (file)
@@ -234,6 +234,14 @@ bool parse_stream(const ConfigLine &line, Config *config)
                stream.mark_pool = allocate_mark_pool(from, to, config);
        }
 
+       // Parse the pacing rate, converting from kilobits to bytes as needed.
+       map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
+       if (pacing_rate_it == line.parameters.end()) {
+               stream.pacing_rate = ~0U;
+       } else {
+               stream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8;
+       }
+
        config->streams.push_back(stream);
        return true;
 }
@@ -316,9 +324,18 @@ bool parse_udpstream(const ConfigLine &line, Config *config)
                udpstream.mark_pool = allocate_mark_pool(from, to, config);
        }
 
+       // Parse the pacing rate, converting from kilobits to bytes as needed.
+       map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
+       if (pacing_rate_it == line.parameters.end()) {
+               udpstream.pacing_rate = ~0U;
+       } else {
+               udpstream.pacing_rate = atoi(pacing_rate_it->second.c_str()) * 1024 / 8;
+       }
+
        config->udpstreams.push_back(udpstream);
        return true;
 }
+
 bool parse_error_log(const ConfigLine &line, Config *config)
 {
        if (line.arguments.size() != 0) {
index e7959c5..b84b957 100644 (file)
--- a/config.h
+++ b/config.h
@@ -18,6 +18,7 @@ struct StreamConfig {
        std::string src;  // Can be empty.
        size_t backlog_size;
        int mark_pool;  // -1 for none.
+       uint32_t pacing_rate;  // In bytes per second. Default is ~0U (no limit).
        enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
 };
 
@@ -25,6 +26,7 @@ struct UDPStreamConfig {
        sockaddr_in6 dst;
        std::string src;  // Can be empty.
        int mark_pool;  // -1 for none.
+       uint32_t pacing_rate;  // In bytes per second. Default is ~0U (no limit).
 };
 
 struct AcceptorConfig {
index 9760a89..e7c76e3 100644 (file)
--- a/cubemap.1
+++ b/cubemap.1
@@ -29,6 +29,10 @@ Cubemap also survives the encoder dying and reconnecting.
 .IP \[bu]
 Per-stream fwmark support, for TCP pacing through tc (separate config needed).
 .IP \[bu]
+Support for setting max pacing rate through the fq packet scheduler
+(obsoletes the previous point, but depends on experimental kernel patches
+that will hit Linux in 3.13 at the earliest)
+.IP \[bu]
 Reflects anything VLC can reflect over HTTP, even the muxes VLC
 has problems reflecting itself (in particular, FLV).
 .IP \[bu]
index c6ab182..bce0659 100644 (file)
@@ -33,6 +33,6 @@ error_log type=console
 #
 stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
-stream /udp.ts src=udp://@:1234 backlog_size=1048576
+stream /udp.ts src=udp://@:1234 backlog_size=1048576 pacing_rate_kbit=2000
 udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
 udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
index c4c5bd4..86ba28a 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -184,6 +184,8 @@ void create_streams(const Config &config,
                        servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
                }
 
+               servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
+
                string src = stream_config.src;
                if (!src.empty()) {
                        multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
@@ -212,7 +214,7 @@ void create_streams(const Config &config,
                if (udpstream_config.mark_pool != -1) {
                        mark_pool = mark_pools[udpstream_config.mark_pool];
                }
-               int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool);
+               int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate);
 
                string src = udpstream_config.src;
                if (!src.empty()) {
index 0ed44fe..288224c 100644 (file)
 #include "stream.h"
 #include "util.h"
 
+#ifndef SO_MAX_PACING_RATE
+#define SO_MAX_PACING_RATE 47
+#endif
+
 using namespace std;
 
 extern AccessLogThread *access_log;
@@ -271,6 +275,14 @@ void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
        streams[stream_index]->mark_pool = mark_pool;
 }
 
+void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
+{
+       MutexLock lock(&mutex);
+       assert(clients.empty());
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->pacing_rate = pacing_rate;
+}
+
 void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
 {
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
@@ -525,6 +537,11 @@ int Server::parse_request(Client *client)
                        log_perror("setsockopt(SO_MARK)");
                }
        }
+       if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
+               if (client->stream->pacing_rate != 0) {
+                       log_perror("setsockopt(SO_MAX_PACING_RATE)");
+               }
+       }
        client->request.clear();
 
        return 200;  // OK!
index 4b3aab8..84f822d 100644 (file)
--- a/server.h
+++ b/server.h
@@ -43,6 +43,10 @@ public:
        // NOTE: This should be set before any clients are connected!
        void set_mark_pool(int stream_index, MarkPool *mark_pool);
 
+       // Set that the given stream should use the given max pacing rate from now on.
+       // NOTE: This should be set before any clients are connected!
+       void set_pacing_rate(int stream_index, uint32_t pacing_rate);
+
        // These will be deferred until the next time an iteration in do_work() happens,
        // and the order between them are undefined.
        // XXX: header should ideally be ordered with respect to data.
index 936bd21..1c99d40 100644 (file)
@@ -121,9 +121,9 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect
        return num_http_streams++;
 }
        
-int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate)
 {
-       udp_streams.push_back(new UDPStream(dst, mark_pool));
+       udp_streams.push_back(new UDPStream(dst, mark_pool, pacing_rate));
        return num_http_streams + udp_streams.size() - 1;
 }
 
@@ -191,6 +191,13 @@ void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool)
 {
        for (int i = 0; i < num_servers; ++i) {
                servers[i].set_mark_pool(stream_index, mark_pool);
+       }
+}
+
+void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_pacing_rate(stream_index, pacing_rate);
        }       
 }
 
index b67fa48..3db9dcc 100644 (file)
@@ -33,7 +33,7 @@ public:
        int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
        int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
        void delete_stream(const std::string &url);
-       int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool);
+       int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate);
 
        // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
        int lookup_stream_by_url(const std::string &url) const;
@@ -47,6 +47,9 @@ public:
        // Connects the given stream to the given mark pool for all the servers.
        void set_mark_pool(int stream_index, MarkPool *mark_pool);
 
+       // Sets the max pacing rate for all the servers.
+       void set_pacing_rate(int stream_index, uint32_t pacing_rate);
+
        // Changes the given stream's backlog size on all the servers.
        void set_backlog_size(int stream_index, size_t new_size);
 
index b7a92c2..6bb86c4 100644 (file)
@@ -26,6 +26,7 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
          bytes_received(0),
          last_suitable_starting_point(-1),
          mark_pool(NULL),
+         pacing_rate(~0U),
          queued_data_last_starting_point(-1)
 {
        if (data_fd == -1) {
@@ -51,6 +52,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
          backlog_size(serialized.backlog_size()),
          bytes_received(serialized.bytes_received()),
          mark_pool(NULL),
+         pacing_rate(~0U),
          queued_data_last_starting_point(-1)
 {
        if (data_fd == -1) {
index 2cdba11..d29b924 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -84,6 +84,9 @@ struct Stream {
        // What pool to fetch marks from, or NULL.
        MarkPool *mark_pool;
 
+       // Maximum pacing rate for the stream.
+       int pacing_rate;
+
        // Queued data, if any. Protected by <queued_data_mutex>.
        // The data pointers in the iovec are owned by us.
        std::vector<iovec> queued_data;
index 5ed16df..11e1527 100644 (file)
@@ -6,10 +6,15 @@
 #include "udpstream.h"
 #include "util.h"
 
-UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+#ifndef SO_MAX_PACING_RATE
+#define SO_MAX_PACING_RATE 47
+#endif
+
+UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate)
        : dst(dst),
          mark_pool(mark_pool),
-         fwmark(0)
+         fwmark(0),
+         pacing_rate(pacing_rate)
 {
        sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
        if (sock == -1) {
@@ -26,6 +31,11 @@ UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool)
                        }
                }
        }
+       if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &pacing_rate, sizeof(pacing_rate)) == -1) {
+               if (pacing_rate != 0) {
+                       log_perror("setsockopt(SO_MAX_PACING_RATE)");
+               }
+       }
 }
 
 UDPStream::~UDPStream()
index e30c948..4d1e1f7 100644 (file)
@@ -17,7 +17,7 @@ class MarkPool;
 class UDPStream {
 public:
        // <mark_pool> can be NULL. Does not take ownership of the mark pool.
-       UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool);
+       UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate);
        ~UDPStream();
 
        void send(const char *data, size_t bytes);
@@ -29,6 +29,7 @@ private:
        int sock;
        MarkPool *mark_pool;
        int fwmark;
+       uint32_t pacing_rate;
 };
 
 #endif  // !defined(_UDPSTREAM_H)