From: Steinar H. Gunderson Date: Thu, 24 Apr 2014 21:56:41 +0000 (+0200) Subject: Remove support for mark pools. X-Git-Tag: 1.1.0~14 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=5cc8cd703a637e276c2595953878fd9561592bfa Remove support for mark pools. This has been fully obsoleted by the fq qdisc, which is easier to set up, more scalable and does not require root privileges. Removing it also removes a significant chunk of code, which is good. --- diff --git a/Makefile b/Makefile index becf0da..d426bc1 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g -pthread $(shell getconf LFS_CFLAGS) LDLIBS=-lprotobuf -pthread -lrt -OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o state.pb.o +OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o state.pb.o all: cubemap diff --git a/README b/README index 2f3981e..358df04 100644 --- a/README +++ b/README @@ -12,7 +12,6 @@ A short list of features: and sending a SIGHUP; all clients will continue as if nothing had happened (unless you delete the stream they are watching, of course). Cubemap also survives the encoder dying and reconnecting. - - Per-stream fwmark support, for TCP pacing through tc (separate config needed). - Support for setting max pacing rate through the fq packet scheduler (obsoletes the previous point, but depends on Linux 3.13 or newer). - Reflects anything VLC can reflect over HTTP, even the muxes VLC diff --git a/client.cpp b/client.cpp index d0ab8f8..b226d5e 100644 --- a/client.cpp +++ b/client.cpp @@ -5,7 +5,6 @@ #include "client.h" #include "log.h" -#include "markpool.h" #include "state.pb.h" #include "stream.h" @@ -17,7 +16,6 @@ using namespace std; Client::Client(int sock) : sock(sock), - fwmark(0), connect_time(time(NULL)), state(Client::READING_REQUEST), stream(NULL), @@ -73,17 +71,6 @@ Client::Client(const ClientProto &serialized, Stream *stream) bytes_lost(serialized.bytes_lost()), num_loss_events(serialized.num_loss_events()) { - if (stream != NULL && stream->mark_pool != NULL) { - fwmark = stream->mark_pool->get_mark(); - } else { - fwmark = 0; // No mark. - } - if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { - if (fwmark != 0) { - log_perror("setsockopt(SO_MARK)"); - } - fwmark = 0; - } if (stream != NULL) { if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &stream->pacing_rate, sizeof(stream->pacing_rate)) == -1) { if (stream->pacing_rate != ~0U) { @@ -120,7 +107,6 @@ ClientStats Client::get_stats() const stats.url = url; } stats.sock = sock; - stats.fwmark = fwmark; stats.remote_addr = remote_addr; stats.connect_time = connect_time; stats.bytes_sent = bytes_sent; diff --git a/client.h b/client.h index 490190b..3767112 100644 --- a/client.h +++ b/client.h @@ -14,7 +14,6 @@ struct Stream; struct ClientStats { std::string url; int sock; - int fwmark; std::string remote_addr; time_t connect_time; size_t bytes_sent; @@ -34,9 +33,6 @@ struct Client { // The file descriptor associated with this socket. int sock; - // The fwmark associated with this socket (or 0). - int fwmark; - // Some information only used for logging. std::string remote_addr; time_t connect_time; diff --git a/config.cpp b/config.cpp index cce70e9..4a3dfb3 100644 --- a/config.cpp +++ b/config.cpp @@ -205,60 +205,6 @@ bool parse_listen(const ConfigLine &line, Config *config) return true; } -int allocate_mark_pool(int from, int to, Config *config) -{ - int pool_index = -1; - - // Reuse mark pools if an identical one exists. - // Otherwise, check if we're overlapping some other mark pool. - for (size_t i = 0; i < config->mark_pools.size(); ++i) { - const MarkPoolConfig &pool = config->mark_pools[i]; - if (from == pool.from && to == pool.to) { - pool_index = i; - } else if ((from >= pool.from && from < pool.to) || - (to >= pool.from && to < pool.to)) { - log(WARNING, "Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks." - "Mark pools must either be completely disjunct, or completely overlapping.", - from, to, pool.from, pool.to); - } - } - - if (pool_index != -1) { - return pool_index; - } - - // No match to existing pools. - MarkPoolConfig pool; - pool.from = from; - pool.to = to; - config->mark_pools.push_back(pool); - - return config->mark_pools.size() - 1; -} - -bool parse_mark_pool(const string &mark_str, int *from, int *to) -{ - size_t split = mark_str.find_first_of('-'); - if (split == string::npos) { - log(ERROR, "Invalid mark specification '%s' (expected 'X-Y').", - mark_str.c_str()); - return false; - } - - string from_str(mark_str.begin(), mark_str.begin() + split); - string to_str(mark_str.begin() + split + 1, mark_str.end()); - *from = atoi(from_str.c_str()); - *to = atoi(to_str.c_str()); - - if (*from <= 0 || *from >= 65536 || *to <= 0 || *to >= 65536) { - log(ERROR, "Mark pool range %d-%d is outside legal range [1,65536>.", - *from, *to); - return false; - } - - return true; -} - bool parse_stream(const ConfigLine &line, Config *config) { if (line.arguments.size() != 1) { @@ -297,18 +243,6 @@ bool parse_stream(const ConfigLine &line, Config *config) return false; } - // Parse marks, if so desired. - map::const_iterator mark_parm_it = line.parameters.find("mark"); - if (mark_parm_it == line.parameters.end()) { - stream.mark_pool = -1; - } else { - int from, to; - if (!parse_mark_pool(mark_parm_it->second, &from, &to)) { - return false; - } - stream.mark_pool = allocate_mark_pool(from, to, config); - } - // Parse the pacing rate, converting from kilobits to bytes as needed. map::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit"); if (pacing_rate_it == line.parameters.end()) { @@ -345,18 +279,6 @@ bool parse_udpstream(const ConfigLine &line, Config *config) // TODO: Verify that the URL is parseable? } - // Parse marks, if so desired. - map::const_iterator mark_parm_it = line.parameters.find("mark"); - if (mark_parm_it == line.parameters.end()) { - udpstream.mark_pool = -1; - } else { - int from, to; - if (!parse_mark_pool(mark_parm_it->second, &from, &to)) { - return false; - } - udpstream.mark_pool = allocate_mark_pool(from, to, config); - } - // Parse the pacing rate, converting from kilobits to bytes as needed. map::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit"); if (pacing_rate_it == line.parameters.end()) { diff --git a/config.h b/config.h index f06b9c5..6053148 100644 --- a/config.h +++ b/config.h @@ -9,15 +9,10 @@ #include #include -struct MarkPoolConfig { - int from, to; -}; - struct StreamConfig { std::string url; // As seen by the client. std::string src; // Can be empty. size_t backlog_size; - int mark_pool; // -1 for none. uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; }; @@ -25,7 +20,6 @@ struct StreamConfig { struct UDPStreamConfig { sockaddr_in6 dst; std::string src; // Can be empty. - int mark_pool; // -1 for none. uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). }; @@ -41,7 +35,6 @@ struct LogConfig { struct Config { bool daemonize; int num_servers; - std::vector mark_pools; std::vector streams; std::vector udpstreams; std::vector acceptors; diff --git a/main.cpp b/main.cpp index 28acd39..5ba79f9 100644 --- a/main.cpp +++ b/main.cpp @@ -22,7 +22,6 @@ #include "input.h" #include "input_stats.h" #include "log.h" -#include "markpool.h" #include "sa_compare.h" #include "serverpool.h" #include "state.pb.h" @@ -35,7 +34,6 @@ using namespace std; AccessLogThread *access_log = NULL; ServerPool *servers = NULL; -vector mark_pools; volatile bool hupped = false; volatile bool stopped = false; @@ -151,11 +149,6 @@ void create_streams(const Config &config, const set &deserialized_urls, multimap *inputs) { - for (unsigned i = 0; i < config.mark_pools.size(); ++i) { - const MarkPoolConfig &mp_config = config.mark_pools[i]; - mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); - } - // HTTP streams. set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { @@ -182,10 +175,6 @@ void create_streams(const Config &config, Stream::Encoding(stream_config.encoding)); } - if (stream_config.mark_pool != -1) { - servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); - } - servers->set_pacing_rate(stream_index, stream_config.pacing_rate); string src = stream_config.src; @@ -212,11 +201,7 @@ void create_streams(const Config &config, // UDP streams. for (unsigned i = 0; i < config.udpstreams.size(); ++i) { const UDPStreamConfig &udpstream_config = config.udpstreams[i]; - MarkPool *mark_pool = NULL; - if (udpstream_config.mark_pool != -1) { - mark_pool = mark_pools[udpstream_config.mark_pool]; - } - int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate); + int stream_index = servers->add_udpstream(udpstream_config.dst, udpstream_config.pacing_rate); string src = udpstream_config.src; if (!src.empty()) { @@ -456,8 +441,7 @@ start: vector acceptors = create_acceptors(config, &deserialized_acceptors); // Put back the existing clients. It doesn't matter which server we - // allocate them to, so just do round-robin. However, we need to add - // them after the mark pools have been set up. + // allocate them to, so just do round-robin. for (int i = 0; i < loaded_state.clients_size(); ++i) { if (deleted_urls.count(loaded_state.clients(i).url()) != 0) { safe_close(loaded_state.clients(i).sock()); @@ -554,11 +538,6 @@ start: } delete servers; - for (unsigned i = 0; i < mark_pools.size(); ++i) { - delete mark_pools[i]; - } - mark_pools.clear(); - access_log->stop(); delete access_log; shut_down_logging(); diff --git a/markpool.cpp b/markpool.cpp deleted file mode 100644 index b3b6a1c..0000000 --- a/markpool.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "log.h" -#include "markpool.h" -#include "mutexlock.h" -#include -#include -#include -#include - -MarkPool::MarkPool(int start, int end) - : start(start), end(end) -{ - assert(start > 0 && start < 65536); - assert(end > 0 && end < 65536); - - for (int i = start; i < end; ++i) { - free_marks.push(i); - } - - pthread_mutex_init(&mutex, NULL); -} - -int MarkPool::get_mark() -{ - MutexLock lock(&mutex); - if (free_marks.empty()) { - log(WARNING, "Out of free marks in mark pool %d-%d, session will not be marked. " - "To fix, increase the pool size and HUP the server.", - start, end); - return 0; - } - int mark = free_marks.front(); - free_marks.pop(); - return mark; -} - -void MarkPool::release_mark(int mark) -{ - if (mark == 0) { - return; - } - - MutexLock lock(&mutex); - free_marks.push(mark); -} diff --git a/markpool.h b/markpool.h deleted file mode 100644 index f90aeea..0000000 --- a/markpool.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef _MARKPOOL_H -#define _MARKPOOL_H - -// A class that hands out fwmarks from a given range in a thread-safe fashion. -// If the range is empty, it returns 0. - -#include -#include - -class MarkPool { -public: - // Limits are [start, end>. Numbers are 16-bit, so above 65535 do not make sense. - MarkPool(int start, int end); - - int get_mark(); - void release_mark(int mark); - -private: - int start, end; - - pthread_mutex_t mutex; - std::queue free_marks; -}; - -#endif // !defined(_MARKPOOL_H) diff --git a/server.cpp b/server.cpp index b381e79..a2f28e6 100644 --- a/server.cpp +++ b/server.cpp @@ -19,7 +19,6 @@ #include "accesslog.h" #include "log.h" -#include "markpool.h" #include "metacube2.h" #include "mutexlock.h" #include "parse.h" @@ -254,14 +253,6 @@ void Server::set_header(int stream_index, const string &http_header, const strin streams[stream_index]->stream_header = stream_header; } -void Server::set_mark_pool(int stream_index, MarkPool *mark_pool) -{ - MutexLock lock(&mutex); - assert(clients.empty()); - assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); - streams[stream_index]->mark_pool = mark_pool; -} - void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) { MutexLock lock(&mutex); @@ -516,16 +507,6 @@ int Server::parse_request(Client *client) client->url = request_tokens[1]; client->stream = stream; - if (client->stream->mark_pool != NULL) { - client->fwmark = client->stream->mark_pool->get_mark(); - } else { - client->fwmark = 0; // No mark. - } - if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) { - if (client->fwmark != 0) { - log_perror("setsockopt(SO_MARK)"); - } - } if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) { if (client->stream->pacing_rate != ~0U) { log_perror("setsockopt(SO_MAX_PACING_RATE)"); @@ -612,10 +593,6 @@ void Server::close_client(Client *client) if (client->stream != NULL) { delete_from(&client->stream->sleeping_clients, client); delete_from(&client->stream->to_process, client); - if (client->stream->mark_pool != NULL) { - int fwmark = client->fwmark; - client->stream->mark_pool->release_mark(fwmark); - } } // Log to access_log. diff --git a/server.h b/server.h index 84f822d..6b94c0e 100644 --- a/server.h +++ b/server.h @@ -23,7 +23,6 @@ struct Stream; #define MAX_CLIENT_REQUEST 16384 class CubemapStateProto; -class MarkPool; class StreamProto; class Server : public Thread { @@ -39,10 +38,6 @@ public: const std::string &http_header, const std::string &stream_header); - // Set that the given stream should use the given mark pool from now on. - // NOTE: This should be set before any clients are connected! - void set_mark_pool(int stream_index, MarkPool *mark_pool); - // Set that the given stream should use the given max pacing rate from now on. // NOTE: This should be set before any clients are connected! void set_pacing_rate(int stream_index, uint32_t pacing_rate); diff --git a/serverpool.cpp b/serverpool.cpp index 1c99d40..70f8658 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -121,9 +121,9 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect return num_http_streams++; } -int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate) +int ServerPool::add_udpstream(const sockaddr_in6 &dst, int pacing_rate) { - udp_streams.push_back(new UDPStream(dst, mark_pool, pacing_rate)); + udp_streams.push_back(new UDPStream(dst, pacing_rate)); return num_http_streams + udp_streams.size() - 1; } @@ -187,13 +187,6 @@ vector ServerPool::get_client_stats() const return ret; } -void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool) -{ - for (int i = 0; i < num_servers; ++i) { - servers[i].set_mark_pool(stream_index, mark_pool); - } -} - void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate) { for (int i = 0; i < num_servers; ++i) { diff --git a/serverpool.h b/serverpool.h index 3db9dcc..2c2c537 100644 --- a/serverpool.h +++ b/serverpool.h @@ -10,7 +10,6 @@ #include "stream.h" #include "udpstream.h" -class MarkPool; class Server; class UDPStream; struct ClientStats; @@ -33,7 +32,7 @@ public: int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); void delete_stream(const std::string &url); - int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate); + int add_udpstream(const sockaddr_in6 &dst, int pacing_rate); // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure. int lookup_stream_by_url(const std::string &url) const; @@ -44,9 +43,6 @@ public: const std::string &stream_header); void add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); - // Connects the given stream to the given mark pool for all the servers. - void set_mark_pool(int stream_index, MarkPool *mark_pool); - // Sets the max pacing rate for all the servers. void set_pacing_rate(int stream_index, uint32_t pacing_rate); diff --git a/stats.cpp b/stats.cpp index 9e3f72c..69bbd73 100644 --- a/stats.cpp +++ b/stats.cpp @@ -57,7 +57,7 @@ void StatsThread::do_work() fprintf(fp, "%s %d %d %s %d %llu %llu %llu\n", client_stats[i].remote_addr.c_str(), client_stats[i].sock, - client_stats[i].fwmark, + 0, // Used to be fwmark. client_stats[i].url.c_str(), int(now - client_stats[i].connect_time), (long long unsigned)(client_stats[i].bytes_sent), diff --git a/stream.cpp b/stream.cpp index 87a7db7..594d77c 100644 --- a/stream.cpp +++ b/stream.cpp @@ -26,7 +26,6 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) backlog_size(backlog_size), bytes_received(0), last_suitable_starting_point(-1), - mark_pool(NULL), pacing_rate(~0U), queued_data_last_starting_point(-1) { @@ -52,7 +51,6 @@ Stream::Stream(const StreamProto &serialized, int data_fd) data_fd(data_fd), backlog_size(serialized.backlog_size()), bytes_received(serialized.bytes_received()), - mark_pool(NULL), pacing_rate(~0U), queued_data_last_starting_point(-1) { diff --git a/stream.h b/stream.h index 5e694aa..ef475e6 100644 --- a/stream.h +++ b/stream.h @@ -11,7 +11,6 @@ #include #include -class MarkPool; class StreamProto; struct Client; @@ -81,9 +80,6 @@ struct Stream { // ). std::vector to_process; - // What pool to fetch marks from, or NULL. - MarkPool *mark_pool; - // Maximum pacing rate for the stream. uint32_t pacing_rate; diff --git a/udpstream.cpp b/udpstream.cpp index 5f5d846..fc00356 100644 --- a/udpstream.cpp +++ b/udpstream.cpp @@ -2,7 +2,6 @@ #include #include "log.h" -#include "markpool.h" #include "udpstream.h" #include "util.h" @@ -10,10 +9,8 @@ #define SO_MAX_PACING_RATE 47 #endif -UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate) +UDPStream::UDPStream(const sockaddr_in6 &dst, uint32_t pacing_rate) : dst(dst), - mark_pool(mark_pool), - fwmark(0), pacing_rate(pacing_rate) { sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); @@ -23,14 +20,6 @@ UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t paci return; } - if (mark_pool != NULL) { - fwmark = mark_pool->get_mark(); - if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { - if (fwmark != 0) { - log_perror("setsockopt(SO_MARK)"); - } - } - } if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &pacing_rate, sizeof(pacing_rate)) == -1) { if (pacing_rate != ~0U) { log_perror("setsockopt(SO_MAX_PACING_RATE)"); @@ -43,9 +32,6 @@ UDPStream::~UDPStream() if (sock != -1) { safe_close(sock); } - if (mark_pool != NULL) { - mark_pool->release_mark(fwmark); - } } void UDPStream::send(const char *data, size_t bytes) diff --git a/udpstream.h b/udpstream.h index 4d1e1f7..271764e 100644 --- a/udpstream.h +++ b/udpstream.h @@ -16,8 +16,7 @@ class MarkPool; class UDPStream { public: - // can be NULL. Does not take ownership of the mark pool. - UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate); + UDPStream(const sockaddr_in6 &dst, uint32_t pacing_rate); ~UDPStream(); void send(const char *data, size_t bytes); @@ -27,8 +26,6 @@ private: sockaddr_in6 dst; int sock; - MarkPool *mark_pool; - int fwmark; uint32_t pacing_rate; };