CXXFLAGS=-Wall -O2 -g -pthread
LDLIBS=-lprotobuf -pthread
-OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
+OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
all: cubemap
return true;
}
+bool parse_udpstream(const ConfigLine &line, Config *config)
+{
+ if (line.arguments.size() != 1) {
+ log(ERROR, "'udpstream' takes exactly one argument");
+ return false;
+ }
+
+ UDPStreamConfig udpstream;
+
+ string hostport = line.arguments[0];
+
+ // See if the argument if on the type [ipv6addr]:port.
+ if (!hostport.empty() && hostport[0] == '[') {
+ size_t split = hostport.find("]:");
+ if (split == string::npos) {
+ log(ERROR, "udpstream destination '%s' is malformed; must be either [ipv6addr]:port or ipv4addr:port");
+ return false;
+ }
+
+ string host(hostport.begin() + 1, hostport.begin() + split);
+ string port = hostport.substr(split + 2);
+
+ udpstream.dst.sin6_family = AF_INET6;
+ if (inet_pton(AF_INET6, host.c_str(), &udpstream.dst.sin6_addr) != 1) {
+ log(ERROR, "udpstream destination host '%s' is not a valid IPv6 address");
+ return false;
+ }
+
+ udpstream.dst.sin6_port = htons(atoi(port.c_str())); // TODO: Verify validity.
+ } else {
+ // OK, then it must be ipv4addr:port.
+ size_t split = hostport.find(":");
+ if (split == string::npos) {
+ log(ERROR, "udpstream destination '%s' is malformed; must be either [ipv6addr]:port or ipv4addr:port");
+ return false;
+ }
+
+ string host(hostport.begin(), hostport.begin() + split);
+ string port = hostport.substr(split + 1);
+
+ // Parse to an IPv4 address, then construct a mapped-v4 address from that.
+ in_addr addr4;
+
+ if (inet_pton(AF_INET, host.c_str(), &addr4) != 1) {
+ log(ERROR, "udpstream destination host '%s' is not a valid IPv4 address");
+ return false;
+ }
+
+ udpstream.dst.sin6_family = AF_INET6;
+ udpstream.dst.sin6_addr.s6_addr32[0] = 0;
+ udpstream.dst.sin6_addr.s6_addr32[1] = 0;
+ udpstream.dst.sin6_addr.s6_addr32[2] = htonl(0xffff);
+ udpstream.dst.sin6_addr.s6_addr32[3] = addr4.s_addr;
+ udpstream.dst.sin6_port = htons(atoi(port.c_str())); // TODO: Verify validity.
+ }
+
+ map<string, string>::const_iterator src_it = line.parameters.find("src");
+ if (src_it == line.parameters.end()) {
+ // This is pretty meaningless, but OK, consistency is good.
+ log(WARNING, "udpstream to %s has no src= attribute, clients will not get any data.",
+ hostport.c_str());
+ } else {
+ udpstream.src = src_it->second;
+ // TODO: Verify that the URL is parseable?
+ }
+
+ // Parse marks, if so desired.
+ map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
+ if (mark_parm_it == line.parameters.end()) {
+ udpstream.mark_pool = -1;
+ } else {
+ int from, to;
+ if (!parse_mark_pool(mark_parm_it->second, &from, &to)) {
+ return false;
+ }
+ udpstream.mark_pool = allocate_mark_pool(from, to, config);
+ }
+
+ config->udpstreams.push_back(udpstream);
+ return true;
+}
bool parse_error_log(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 0) {
if (!parse_stream(line, config)) {
return false;
}
+ } else if (line.keyword == "udpstream") {
+ if (!parse_udpstream(line, config)) {
+ return false;
+ }
} else if (line.keyword == "error_log") {
if (!parse_error_log(line, config)) {
return false;
// Various routines that deal with parsing the configuration file.
+#include <arpa/inet.h>
#include <stddef.h>
#include <string>
#include <vector>
enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
+struct UDPStreamConfig {
+ sockaddr_in6 dst;
+ std::string src; // Can be empty.
+ int mark_pool; // -1 for none.
+};
+
struct AcceptorConfig {
int port;
};
int num_servers;
std::vector<MarkPoolConfig> mark_pools;
std::vector<StreamConfig> streams;
+ std::vector<UDPStreamConfig> udpstreams;
std::vector<AcceptorConfig> acceptors;
std::vector<LogConfig> log_destinations;
stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
stream /udp.ts src=udp://@:1234 backlog_size=1048576
+udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
+udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
return acceptors;
}
+void create_config_input(const string &src, multimap<string, InputWithRefcount> *inputs)
+{
+ if (src.empty()) {
+ return;
+ }
+ if (inputs->count(src) != 0) {
+ return;
+ }
+
+ InputWithRefcount iwr;
+ iwr.input = create_input(src);
+ if (iwr.input == NULL) {
+ log(ERROR, "did not understand URL '%s', clients will not get any data.",
+ src.c_str());
+ return;
+ }
+ iwr.refcount = 0;
+ inputs->insert(make_pair(src, iwr));
+}
+
// Find all streams in the configuration file, and create inputs for them.
void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
{
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
- if (stream_config.src.empty()) {
- continue;
- }
-
- string src = stream_config.src;
- if (inputs->count(src) != 0) {
- continue;
- }
-
- InputWithRefcount iwr;
- iwr.input = create_input(src);
- if (iwr.input == NULL) {
- log(ERROR, "did not understand URL '%s', clients will not get any data.",
- src.c_str());
- continue;
- }
- iwr.refcount = 0;
- inputs->insert(make_pair(src, iwr));
+ create_config_input(stream_config.src, inputs);
+ }
+ for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
+ const UDPStreamConfig &udpstream_config = config.udpstreams[i];
+ create_config_input(udpstream_config.src, inputs);
}
}
mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
}
+ // HTTP streams.
set<string> expecting_urls = deserialized_urls;
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
}
}
- // Warn about any servers we've lost.
+ // Warn about any HTTP servers we've lost.
// TODO: Make an option (delete=yes?) to actually shut down streams.
for (set<string>::const_iterator stream_it = expecting_urls.begin();
stream_it != expecting_urls.end();
"It will not be deleted, but clients will not get any new inputs.",
url.c_str());
}
+
+ // UDP streams.
+ for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
+ const UDPStreamConfig &udpstream_config = config.udpstreams[i];
+ MarkPool *mark_pool = NULL;
+ if (udpstream_config.mark_pool != -1) {
+ mark_pool = mark_pools[udpstream_config.mark_pool];
+ }
+ int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool);
+
+ string src = udpstream_config.src;
+ if (!src.empty()) {
+ multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+ assert(input_it != inputs->end());
+ input_it->second.input->add_destination(stream_index);
+ ++input_it->second.refcount;
+ }
+ }
}
void open_logs(const vector<LogConfig> &log_destinations)
ServerPool::ServerPool(int size)
: servers(new Server[size]),
num_servers(size),
- clients_added(0)
+ clients_added(0),
+ num_http_streams(0)
{
}
ServerPool::~ServerPool()
{
delete[] servers;
+
+ for (size_t i = 0; i < udp_streams.size(); ++i) {
+ delete udp_streams[i];
+ }
}
CubemapStateProto ServerPool::serialize()
int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
{
- int stream_index = -1;
+ // Adding more HTTP streams after UDP streams would cause the UDP stream
+ // indices to move around, which is obviously not good.
+ assert(udp_streams.empty());
+
for (int i = 0; i < num_servers; ++i) {
- int stream_index2 = servers[i].add_stream(url, backlog_size, encoding);
- if (i == 0) {
- stream_index = stream_index2;
- } else {
- // Verify that all servers have this under the same stream index.
- assert(stream_index == stream_index2);
- }
+ int stream_index = servers[i].add_stream(url, backlog_size, encoding);
+ assert(stream_index == num_http_streams);
}
- return stream_index;
+ return num_http_streams++;
}
int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
{
+ // Adding more HTTP streams after UDP streams would cause the UDP stream
+ // indices to move around, which is obviously not good.
+ assert(udp_streams.empty());
+
assert(!data_fds.empty());
string contents;
- int stream_index = -1;
for (int i = 0; i < num_servers; ++i) {
int data_fd;
if (i < int(data_fds.size())) {
data_fd = make_tempfile(contents);
}
- int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd);
- if (i == 0) {
- stream_index = stream_index2;
- } else {
- // Verify that all servers have this under the same stream index.
- assert(stream_index == stream_index2);
- }
+ int stream_index = servers[i].add_stream_from_serialized(stream, data_fd);
+ assert(stream_index == num_http_streams);
}
// Close and delete any leftovers, if the number of servers was reduced.
safe_close(data_fds[i]); // Implicitly deletes the file.
}
- return stream_index;
+ return num_http_streams++;
+}
+
+int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+{
+ udp_streams.push_back(new UDPStream(dst, mark_pool));
+ return num_http_streams + udp_streams.size() - 1;
}
void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header)
{
+ assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
+
+ if (stream_index >= num_http_streams) {
+ // UDP stream. TODO: Log which stream this is.
+ if (!stream_header.empty()) {
+ log(WARNING, "Trying to send stream format with headers to a UDP destination. This is unlikely to work well.");
+ }
+
+ // Ignore the HTTP header.
+ return;
+ }
+
+ // HTTP stream.
for (int i = 0; i < num_servers; ++i) {
servers[i].set_header(stream_index, http_header, stream_header);
}
void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
{
+ assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
+
+ if (stream_index >= num_http_streams) {
+ // UDP stream.
+ udp_streams[stream_index - num_http_streams]->send(data, bytes);
+ return;
+ }
+
+ // HTTP stream.
for (int i = 0; i < num_servers; ++i) {
servers[i].add_data_deferred(stream_index, data, bytes);
}
#include "server.h"
#include "state.pb.h"
#include "stream.h"
+#include "udpstream.h"
class MarkPool;
class Server;
// Adds the given stream to all the servers. Returns the stream index.
int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
+ int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool);
// Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
int lookup_stream_by_url(const std::string &url) const;
Server *servers;
int num_servers, clients_added;
+ // Our indexing is currently rather primitive; every stream_index in
+ // [0, num_http_streams) maps to a HTTP stream (of which every Server
+ // has exactly one copy), and after that, it's mapping directly into
+ // <udp_streams>.
+ int num_http_streams;
+ std::vector<UDPStream *> udp_streams;
+
ServerPool(const ServerPool &);
};
--- /dev/null
+#include "log.h"
+#include "markpool.h"
+#include "udpstream.h"
+#include "util.h"
+
+UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+ : dst(dst),
+ mark_pool(mark_pool),
+ fwmark(0)
+{
+ sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
+ if (sock == -1) {
+ // Oops. Ignore this output, then.
+ log_perror("socket");
+ return;
+ }
+
+ if (mark_pool != NULL) {
+ fwmark = mark_pool->get_mark();
+ if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) {
+ if (fwmark != 0) {
+ log_perror("setsockopt(SO_MARK)");
+ }
+ }
+ }
+}
+
+UDPStream::~UDPStream()
+{
+ if (sock != -1) {
+ safe_close(sock);
+ }
+ if (mark_pool != NULL) {
+ mark_pool->release_mark(fwmark);
+ }
+}
+
+void UDPStream::send(const char *data, size_t bytes)
+{
+ if (sock == -1) {
+ return;
+ }
+ ssize_t err = sendto(sock, data, bytes, 0, reinterpret_cast<sockaddr *>(&dst), sizeof(dst));
+ if (err == -1) {
+ log_perror("sendto");
+ }
+}
--- /dev/null
+#ifndef _UDPSTREAM_H
+#define _UDPSTREAM_H 1
+
+// A single UDP destination. This is done a lot less efficient than HTTP streaming
+// since we expect to have so few of them, which also means things are a lot simpler.
+// In particular, we run in the input's thread, so there is no backlog, which means
+// that there is no state (UDP itself is, of course, stateless).
+
+#include <arpa/inet.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+
+class MarkPool;
+
+class UDPStream {
+public:
+ // <mark_pool> can be NULL. Does not take ownership of the mark pool.
+ UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool);
+ ~UDPStream();
+
+ void send(const char *data, size_t bytes);
+
+private:
+ UDPStream(const UDPStream& other);
+
+ sockaddr_in6 dst;
+ int sock;
+ MarkPool *mark_pool;
+ int fwmark;
+};
+
+#endif // !defined(_UDPSTREAM_H)