CXXFLAGS=-Wall -O2 -g -pthread $(shell getconf LFS_CFLAGS)
LDLIBS=-lprotobuf -pthread -lrt
-OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o state.pb.o
+OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o state.pb.o
all: cubemap
and sending a SIGHUP; all clients will continue as if nothing had happened
(unless you delete the stream they are watching, of course).
Cubemap also survives the encoder dying and reconnecting.
- - Per-stream fwmark support, for TCP pacing through tc (separate config needed).
- Support for setting max pacing rate through the fq packet scheduler
(obsoletes the previous point, but depends on Linux 3.13 or newer).
- Reflects anything VLC can reflect over HTTP, even the muxes VLC
#include "client.h"
#include "log.h"
-#include "markpool.h"
#include "state.pb.h"
#include "stream.h"
Client::Client(int sock)
: sock(sock),
- fwmark(0),
connect_time(time(NULL)),
state(Client::READING_REQUEST),
stream(NULL),
bytes_lost(serialized.bytes_lost()),
num_loss_events(serialized.num_loss_events())
{
- if (stream != NULL && stream->mark_pool != NULL) {
- fwmark = stream->mark_pool->get_mark();
- } else {
- fwmark = 0; // No mark.
- }
- if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) {
- if (fwmark != 0) {
- log_perror("setsockopt(SO_MARK)");
- }
- fwmark = 0;
- }
if (stream != NULL) {
if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &stream->pacing_rate, sizeof(stream->pacing_rate)) == -1) {
if (stream->pacing_rate != ~0U) {
stats.url = url;
}
stats.sock = sock;
- stats.fwmark = fwmark;
stats.remote_addr = remote_addr;
stats.connect_time = connect_time;
stats.bytes_sent = bytes_sent;
struct ClientStats {
std::string url;
int sock;
- int fwmark;
std::string remote_addr;
time_t connect_time;
size_t bytes_sent;
// The file descriptor associated with this socket.
int sock;
- // The fwmark associated with this socket (or 0).
- int fwmark;
-
// Some information only used for logging.
std::string remote_addr;
time_t connect_time;
return true;
}
-int allocate_mark_pool(int from, int to, Config *config)
-{
- int pool_index = -1;
-
- // Reuse mark pools if an identical one exists.
- // Otherwise, check if we're overlapping some other mark pool.
- for (size_t i = 0; i < config->mark_pools.size(); ++i) {
- const MarkPoolConfig &pool = config->mark_pools[i];
- if (from == pool.from && to == pool.to) {
- pool_index = i;
- } else if ((from >= pool.from && from < pool.to) ||
- (to >= pool.from && to < pool.to)) {
- log(WARNING, "Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks."
- "Mark pools must either be completely disjunct, or completely overlapping.",
- from, to, pool.from, pool.to);
- }
- }
-
- if (pool_index != -1) {
- return pool_index;
- }
-
- // No match to existing pools.
- MarkPoolConfig pool;
- pool.from = from;
- pool.to = to;
- config->mark_pools.push_back(pool);
-
- return config->mark_pools.size() - 1;
-}
-
-bool parse_mark_pool(const string &mark_str, int *from, int *to)
-{
- size_t split = mark_str.find_first_of('-');
- if (split == string::npos) {
- log(ERROR, "Invalid mark specification '%s' (expected 'X-Y').",
- mark_str.c_str());
- return false;
- }
-
- string from_str(mark_str.begin(), mark_str.begin() + split);
- string to_str(mark_str.begin() + split + 1, mark_str.end());
- *from = atoi(from_str.c_str());
- *to = atoi(to_str.c_str());
-
- if (*from <= 0 || *from >= 65536 || *to <= 0 || *to >= 65536) {
- log(ERROR, "Mark pool range %d-%d is outside legal range [1,65536>.",
- *from, *to);
- return false;
- }
-
- return true;
-}
-
bool parse_stream(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 1) {
return false;
}
- // Parse marks, if so desired.
- map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
- if (mark_parm_it == line.parameters.end()) {
- stream.mark_pool = -1;
- } else {
- int from, to;
- if (!parse_mark_pool(mark_parm_it->second, &from, &to)) {
- return false;
- }
- stream.mark_pool = allocate_mark_pool(from, to, config);
- }
-
// Parse the pacing rate, converting from kilobits to bytes as needed.
map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
if (pacing_rate_it == line.parameters.end()) {
// TODO: Verify that the URL is parseable?
}
- // Parse marks, if so desired.
- map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
- if (mark_parm_it == line.parameters.end()) {
- udpstream.mark_pool = -1;
- } else {
- int from, to;
- if (!parse_mark_pool(mark_parm_it->second, &from, &to)) {
- return false;
- }
- udpstream.mark_pool = allocate_mark_pool(from, to, config);
- }
-
// Parse the pacing rate, converting from kilobits to bytes as needed.
map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
if (pacing_rate_it == line.parameters.end()) {
#include <string>
#include <vector>
-struct MarkPoolConfig {
- int from, to;
-};
-
struct StreamConfig {
std::string url; // As seen by the client.
std::string src; // Can be empty.
size_t backlog_size;
- int mark_pool; // -1 for none.
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
struct UDPStreamConfig {
sockaddr_in6 dst;
std::string src; // Can be empty.
- int mark_pool; // -1 for none.
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
};
struct Config {
bool daemonize;
int num_servers;
- std::vector<MarkPoolConfig> mark_pools;
std::vector<StreamConfig> streams;
std::vector<UDPStreamConfig> udpstreams;
std::vector<AcceptorConfig> acceptors;
#include "input.h"
#include "input_stats.h"
#include "log.h"
-#include "markpool.h"
#include "sa_compare.h"
#include "serverpool.h"
#include "state.pb.h"
AccessLogThread *access_log = NULL;
ServerPool *servers = NULL;
-vector<MarkPool *> mark_pools;
volatile bool hupped = false;
volatile bool stopped = false;
const set<string> &deserialized_urls,
multimap<string, InputWithRefcount> *inputs)
{
- for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
- const MarkPoolConfig &mp_config = config.mark_pools[i];
- mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
- }
-
// HTTP streams.
set<string> expecting_urls = deserialized_urls;
for (unsigned i = 0; i < config.streams.size(); ++i) {
Stream::Encoding(stream_config.encoding));
}
- if (stream_config.mark_pool != -1) {
- servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
- }
-
servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
string src = stream_config.src;
// UDP streams.
for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
const UDPStreamConfig &udpstream_config = config.udpstreams[i];
- MarkPool *mark_pool = NULL;
- if (udpstream_config.mark_pool != -1) {
- mark_pool = mark_pools[udpstream_config.mark_pool];
- }
- int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate);
+ int stream_index = servers->add_udpstream(udpstream_config.dst, udpstream_config.pacing_rate);
string src = udpstream_config.src;
if (!src.empty()) {
vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
// Put back the existing clients. It doesn't matter which server we
- // allocate them to, so just do round-robin. However, we need to add
- // them after the mark pools have been set up.
+ // allocate them to, so just do round-robin.
for (int i = 0; i < loaded_state.clients_size(); ++i) {
if (deleted_urls.count(loaded_state.clients(i).url()) != 0) {
safe_close(loaded_state.clients(i).sock());
}
delete servers;
- for (unsigned i = 0; i < mark_pools.size(); ++i) {
- delete mark_pools[i];
- }
- mark_pools.clear();
-
access_log->stop();
delete access_log;
shut_down_logging();
+++ /dev/null
-#include "log.h"
-#include "markpool.h"
-#include "mutexlock.h"
-#include <stdio.h>
-#include <assert.h>
-#include <pthread.h>
-#include <queue>
-
-MarkPool::MarkPool(int start, int end)
- : start(start), end(end)
-{
- assert(start > 0 && start < 65536);
- assert(end > 0 && end < 65536);
-
- for (int i = start; i < end; ++i) {
- free_marks.push(i);
- }
-
- pthread_mutex_init(&mutex, NULL);
-}
-
-int MarkPool::get_mark()
-{
- MutexLock lock(&mutex);
- if (free_marks.empty()) {
- log(WARNING, "Out of free marks in mark pool %d-%d, session will not be marked. "
- "To fix, increase the pool size and HUP the server.",
- start, end);
- return 0;
- }
- int mark = free_marks.front();
- free_marks.pop();
- return mark;
-}
-
-void MarkPool::release_mark(int mark)
-{
- if (mark == 0) {
- return;
- }
-
- MutexLock lock(&mutex);
- free_marks.push(mark);
-}
+++ /dev/null
-#ifndef _MARKPOOL_H
-#define _MARKPOOL_H
-
-// A class that hands out fwmarks from a given range in a thread-safe fashion.
-// If the range is empty, it returns 0.
-
-#include <pthread.h>
-#include <queue>
-
-class MarkPool {
-public:
- // Limits are [start, end>. Numbers are 16-bit, so above 65535 do not make sense.
- MarkPool(int start, int end);
-
- int get_mark();
- void release_mark(int mark);
-
-private:
- int start, end;
-
- pthread_mutex_t mutex;
- std::queue<int> free_marks;
-};
-
-#endif // !defined(_MARKPOOL_H)
#include "accesslog.h"
#include "log.h"
-#include "markpool.h"
#include "metacube2.h"
#include "mutexlock.h"
#include "parse.h"
streams[stream_index]->stream_header = stream_header;
}
-void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
-{
- MutexLock lock(&mutex);
- assert(clients.empty());
- assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->mark_pool = mark_pool;
-}
-
void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
{
MutexLock lock(&mutex);
client->url = request_tokens[1];
client->stream = stream;
- if (client->stream->mark_pool != NULL) {
- client->fwmark = client->stream->mark_pool->get_mark();
- } else {
- client->fwmark = 0; // No mark.
- }
- if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) {
- if (client->fwmark != 0) {
- log_perror("setsockopt(SO_MARK)");
- }
- }
if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
if (client->stream->pacing_rate != ~0U) {
log_perror("setsockopt(SO_MAX_PACING_RATE)");
if (client->stream != NULL) {
delete_from(&client->stream->sleeping_clients, client);
delete_from(&client->stream->to_process, client);
- if (client->stream->mark_pool != NULL) {
- int fwmark = client->fwmark;
- client->stream->mark_pool->release_mark(fwmark);
- }
}
// Log to access_log.
#define MAX_CLIENT_REQUEST 16384
class CubemapStateProto;
-class MarkPool;
class StreamProto;
class Server : public Thread {
const std::string &http_header,
const std::string &stream_header);
- // Set that the given stream should use the given mark pool from now on.
- // NOTE: This should be set before any clients are connected!
- void set_mark_pool(int stream_index, MarkPool *mark_pool);
-
// Set that the given stream should use the given max pacing rate from now on.
// NOTE: This should be set before any clients are connected!
void set_pacing_rate(int stream_index, uint32_t pacing_rate);
return num_http_streams++;
}
-int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate)
+int ServerPool::add_udpstream(const sockaddr_in6 &dst, int pacing_rate)
{
- udp_streams.push_back(new UDPStream(dst, mark_pool, pacing_rate));
+ udp_streams.push_back(new UDPStream(dst, pacing_rate));
return num_http_streams + udp_streams.size() - 1;
}
return ret;
}
-void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool)
-{
- for (int i = 0; i < num_servers; ++i) {
- servers[i].set_mark_pool(stream_index, mark_pool);
- }
-}
-
void ServerPool::set_pacing_rate(int stream_index, uint32_t pacing_rate)
{
for (int i = 0; i < num_servers; ++i) {
#include "stream.h"
#include "udpstream.h"
-class MarkPool;
class Server;
class UDPStream;
struct ClientStats;
int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
- int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool, int pacing_rate);
+ int add_udpstream(const sockaddr_in6 &dst, int pacing_rate);
// Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
int lookup_stream_by_url(const std::string &url) const;
const std::string &stream_header);
void add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
- // Connects the given stream to the given mark pool for all the servers.
- void set_mark_pool(int stream_index, MarkPool *mark_pool);
-
// Sets the max pacing rate for all the servers.
void set_pacing_rate(int stream_index, uint32_t pacing_rate);
fprintf(fp, "%s %d %d %s %d %llu %llu %llu\n",
client_stats[i].remote_addr.c_str(),
client_stats[i].sock,
- client_stats[i].fwmark,
+ 0, // Used to be fwmark.
client_stats[i].url.c_str(),
int(now - client_stats[i].connect_time),
(long long unsigned)(client_stats[i].bytes_sent),
backlog_size(backlog_size),
bytes_received(0),
last_suitable_starting_point(-1),
- mark_pool(NULL),
pacing_rate(~0U),
queued_data_last_starting_point(-1)
{
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
- mark_pool(NULL),
pacing_rate(~0U),
queued_data_last_starting_point(-1)
{
#include <string>
#include <vector>
-class MarkPool;
class StreamProto;
struct Client;
// <sleeping_clients>).
std::vector<Client *> to_process;
- // What pool to fetch marks from, or NULL.
- MarkPool *mark_pool;
-
// Maximum pacing rate for the stream.
uint32_t pacing_rate;
#include <sys/types.h>
#include "log.h"
-#include "markpool.h"
#include "udpstream.h"
#include "util.h"
#define SO_MAX_PACING_RATE 47
#endif
-UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate)
+UDPStream::UDPStream(const sockaddr_in6 &dst, uint32_t pacing_rate)
: dst(dst),
- mark_pool(mark_pool),
- fwmark(0),
pacing_rate(pacing_rate)
{
sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
return;
}
- if (mark_pool != NULL) {
- fwmark = mark_pool->get_mark();
- if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) {
- if (fwmark != 0) {
- log_perror("setsockopt(SO_MARK)");
- }
- }
- }
if (setsockopt(sock, SOL_SOCKET, SO_MAX_PACING_RATE, &pacing_rate, sizeof(pacing_rate)) == -1) {
if (pacing_rate != ~0U) {
log_perror("setsockopt(SO_MAX_PACING_RATE)");
if (sock != -1) {
safe_close(sock);
}
- if (mark_pool != NULL) {
- mark_pool->release_mark(fwmark);
- }
}
void UDPStream::send(const char *data, size_t bytes)
class UDPStream {
public:
- // <mark_pool> can be NULL. Does not take ownership of the mark pool.
- UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool, uint32_t pacing_rate);
+ UDPStream(const sockaddr_in6 &dst, uint32_t pacing_rate);
~UDPStream();
void send(const char *data, size_t bytes);
sockaddr_in6 dst;
int sock;
- MarkPool *mark_pool;
- int fwmark;
uint32_t pacing_rate;
};