From: Steinar H. Gunderson Date: Sun, 21 Apr 2013 11:55:23 +0000 (+0200) Subject: Add support for UDP outputs. X-Git-Tag: 1.0.0~48^2 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=adbeb2f8972672ed1059509662d006df47762228;hp=0ff463f4df8e085ba88f8ede735f16b140a2bf8e Add support for UDP outputs. --- diff --git a/Makefile b/Makefile index 20f4010..218285e 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g -pthread LDLIBS=-lprotobuf -pthread -OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o +OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o all: cubemap diff --git a/config.cpp b/config.cpp index 4050338..d47bfec 100644 --- a/config.cpp +++ b/config.cpp @@ -235,6 +235,87 @@ bool parse_stream(const ConfigLine &line, Config *config) return true; } +bool parse_udpstream(const ConfigLine &line, Config *config) +{ + if (line.arguments.size() != 1) { + log(ERROR, "'udpstream' takes exactly one argument"); + return false; + } + + UDPStreamConfig udpstream; + + string hostport = line.arguments[0]; + + // See if the argument if on the type [ipv6addr]:port. + if (!hostport.empty() && hostport[0] == '[') { + size_t split = hostport.find("]:"); + if (split == string::npos) { + log(ERROR, "udpstream destination '%s' is malformed; must be either [ipv6addr]:port or ipv4addr:port"); + return false; + } + + string host(hostport.begin() + 1, hostport.begin() + split); + string port = hostport.substr(split + 2); + + udpstream.dst.sin6_family = AF_INET6; + if (inet_pton(AF_INET6, host.c_str(), &udpstream.dst.sin6_addr) != 1) { + log(ERROR, "udpstream destination host '%s' is not a valid IPv6 address"); + return false; + } + + udpstream.dst.sin6_port = htons(atoi(port.c_str())); // TODO: Verify validity. + } else { + // OK, then it must be ipv4addr:port. + size_t split = hostport.find(":"); + if (split == string::npos) { + log(ERROR, "udpstream destination '%s' is malformed; must be either [ipv6addr]:port or ipv4addr:port"); + return false; + } + + string host(hostport.begin(), hostport.begin() + split); + string port = hostport.substr(split + 1); + + // Parse to an IPv4 address, then construct a mapped-v4 address from that. + in_addr addr4; + + if (inet_pton(AF_INET, host.c_str(), &addr4) != 1) { + log(ERROR, "udpstream destination host '%s' is not a valid IPv4 address"); + return false; + } + + udpstream.dst.sin6_family = AF_INET6; + udpstream.dst.sin6_addr.s6_addr32[0] = 0; + udpstream.dst.sin6_addr.s6_addr32[1] = 0; + udpstream.dst.sin6_addr.s6_addr32[2] = htonl(0xffff); + udpstream.dst.sin6_addr.s6_addr32[3] = addr4.s_addr; + udpstream.dst.sin6_port = htons(atoi(port.c_str())); // TODO: Verify validity. + } + + map::const_iterator src_it = line.parameters.find("src"); + if (src_it == line.parameters.end()) { + // This is pretty meaningless, but OK, consistency is good. + log(WARNING, "udpstream to %s has no src= attribute, clients will not get any data.", + hostport.c_str()); + } else { + udpstream.src = src_it->second; + // TODO: Verify that the URL is parseable? + } + + // Parse marks, if so desired. + map::const_iterator mark_parm_it = line.parameters.find("mark"); + if (mark_parm_it == line.parameters.end()) { + udpstream.mark_pool = -1; + } else { + int from, to; + if (!parse_mark_pool(mark_parm_it->second, &from, &to)) { + return false; + } + udpstream.mark_pool = allocate_mark_pool(from, to, config); + } + + config->udpstreams.push_back(udpstream); + return true; +} bool parse_error_log(const ConfigLine &line, Config *config) { if (line.arguments.size() != 0) { @@ -317,6 +398,10 @@ bool parse_config(const string &filename, Config *config) if (!parse_stream(line, config)) { return false; } + } else if (line.keyword == "udpstream") { + if (!parse_udpstream(line, config)) { + return false; + } } else if (line.keyword == "error_log") { if (!parse_error_log(line, config)) { return false; diff --git a/config.h b/config.h index d79d3d7..8de6704 100644 --- a/config.h +++ b/config.h @@ -3,6 +3,7 @@ // Various routines that deal with parsing the configuration file. +#include #include #include #include @@ -19,6 +20,12 @@ struct StreamConfig { enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; }; +struct UDPStreamConfig { + sockaddr_in6 dst; + std::string src; // Can be empty. + int mark_pool; // -1 for none. +}; + struct AcceptorConfig { int port; }; @@ -33,6 +40,7 @@ struct Config { int num_servers; std::vector mark_pools; std::vector streams; + std::vector udpstreams; std::vector acceptors; std::vector log_destinations; diff --git a/cubemap.config.sample b/cubemap.config.sample index f3e2e0c..9b104d8 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -28,3 +28,5 @@ error_log type=console stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube stream /udp.ts src=udp://@:1234 backlog_size=1048576 +udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube +udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube diff --git a/main.cpp b/main.cpp index 5ec0cb4..8d60826 100644 --- a/main.cpp +++ b/main.cpp @@ -109,29 +109,36 @@ vector create_acceptors( return acceptors; } +void create_config_input(const string &src, multimap *inputs) +{ + if (src.empty()) { + return; + } + if (inputs->count(src) != 0) { + return; + } + + InputWithRefcount iwr; + iwr.input = create_input(src); + if (iwr.input == NULL) { + log(ERROR, "did not understand URL '%s', clients will not get any data.", + src.c_str()); + return; + } + iwr.refcount = 0; + inputs->insert(make_pair(src, iwr)); +} + // Find all streams in the configuration file, and create inputs for them. void create_config_inputs(const Config &config, multimap *inputs) { for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (stream_config.src.empty()) { - continue; - } - - string src = stream_config.src; - if (inputs->count(src) != 0) { - continue; - } - - InputWithRefcount iwr; - iwr.input = create_input(src); - if (iwr.input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", - src.c_str()); - continue; - } - iwr.refcount = 0; - inputs->insert(make_pair(src, iwr)); + create_config_input(stream_config.src, inputs); + } + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + create_config_input(udpstream_config.src, inputs); } } @@ -144,6 +151,7 @@ void create_streams(const Config &config, mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); } + // HTTP streams. set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; @@ -174,7 +182,7 @@ void create_streams(const Config &config, } } - // Warn about any servers we've lost. + // Warn about any HTTP servers we've lost. // TODO: Make an option (delete=yes?) to actually shut down streams. for (set::const_iterator stream_it = expecting_urls.begin(); stream_it != expecting_urls.end(); @@ -184,6 +192,24 @@ void create_streams(const Config &config, "It will not be deleted, but clients will not get any new inputs.", url.c_str()); } + + // UDP streams. + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + MarkPool *mark_pool = NULL; + if (udpstream_config.mark_pool != -1) { + mark_pool = mark_pools[udpstream_config.mark_pool]; + } + int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool); + + string src = udpstream_config.src; + if (!src.empty()) { + multimap::iterator input_it = inputs->find(src); + assert(input_it != inputs->end()); + input_it->second.input->add_destination(stream_index); + ++input_it->second.refcount; + } + } } void open_logs(const vector &log_destinations) diff --git a/serverpool.cpp b/serverpool.cpp index fb8ca45..f8e64aa 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -16,13 +16,18 @@ using namespace std; ServerPool::ServerPool(int size) : servers(new Server[size]), num_servers(size), - clients_added(0) + clients_added(0), + num_http_streams(0) { } ServerPool::~ServerPool() { delete[] servers; + + for (size_t i = 0; i < udp_streams.size(); ++i) { + delete udp_streams[i]; + } } CubemapStateProto ServerPool::serialize() @@ -69,24 +74,25 @@ int ServerPool::lookup_stream_by_url(const std::string &url) const int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) { - int stream_index = -1; + // Adding more HTTP streams after UDP streams would cause the UDP stream + // indices to move around, which is obviously not good. + assert(udp_streams.empty()); + for (int i = 0; i < num_servers; ++i) { - int stream_index2 = servers[i].add_stream(url, backlog_size, encoding); - if (i == 0) { - stream_index = stream_index2; - } else { - // Verify that all servers have this under the same stream index. - assert(stream_index == stream_index2); - } + int stream_index = servers[i].add_stream(url, backlog_size, encoding); + assert(stream_index == num_http_streams); } - return stream_index; + return num_http_streams++; } int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector &data_fds) { + // Adding more HTTP streams after UDP streams would cause the UDP stream + // indices to move around, which is obviously not good. + assert(udp_streams.empty()); + assert(!data_fds.empty()); string contents; - int stream_index = -1; for (int i = 0; i < num_servers; ++i) { int data_fd; if (i < int(data_fds.size())) { @@ -102,13 +108,8 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect data_fd = make_tempfile(contents); } - int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd); - if (i == 0) { - stream_index = stream_index2; - } else { - // Verify that all servers have this under the same stream index. - assert(stream_index == stream_index2); - } + int stream_index = servers[i].add_stream_from_serialized(stream, data_fd); + assert(stream_index == num_http_streams); } // Close and delete any leftovers, if the number of servers was reduced. @@ -116,11 +117,30 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect safe_close(data_fds[i]); // Implicitly deletes the file. } - return stream_index; + return num_http_streams++; +} + +int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool) +{ + udp_streams.push_back(new UDPStream(dst, mark_pool)); + return num_http_streams + udp_streams.size() - 1; } void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header) { + assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size())); + + if (stream_index >= num_http_streams) { + // UDP stream. TODO: Log which stream this is. + if (!stream_header.empty()) { + log(WARNING, "Trying to send stream format with headers to a UDP destination. This is unlikely to work well."); + } + + // Ignore the HTTP header. + return; + } + + // HTTP stream. for (int i = 0; i < num_servers; ++i) { servers[i].set_header(stream_index, http_header, stream_header); } @@ -128,6 +148,15 @@ void ServerPool::set_header(int stream_index, const string &http_header, const s void ServerPool::add_data(int stream_index, const char *data, size_t bytes) { + assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size())); + + if (stream_index >= num_http_streams) { + // UDP stream. + udp_streams[stream_index - num_http_streams]->send(data, bytes); + return; + } + + // HTTP stream. for (int i = 0; i < num_servers; ++i) { servers[i].add_data_deferred(stream_index, data, bytes); } diff --git a/serverpool.h b/serverpool.h index 5f5f6f8..5b7fca3 100644 --- a/serverpool.h +++ b/serverpool.h @@ -8,6 +8,7 @@ #include "server.h" #include "state.pb.h" #include "stream.h" +#include "udpstream.h" class MarkPool; class Server; @@ -29,6 +30,7 @@ public: // Adds the given stream to all the servers. Returns the stream index. int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); + int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool); // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure. int lookup_stream_by_url(const std::string &url) const; @@ -60,6 +62,13 @@ private: Server *servers; int num_servers, clients_added; + // Our indexing is currently rather primitive; every stream_index in + // [0, num_http_streams) maps to a HTTP stream (of which every Server + // has exactly one copy), and after that, it's mapping directly into + // . + int num_http_streams; + std::vector udp_streams; + ServerPool(const ServerPool &); }; diff --git a/udpstream.cpp b/udpstream.cpp new file mode 100644 index 0000000..1852243 --- /dev/null +++ b/udpstream.cpp @@ -0,0 +1,47 @@ +#include "log.h" +#include "markpool.h" +#include "udpstream.h" +#include "util.h" + +UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool) + : dst(dst), + mark_pool(mark_pool), + fwmark(0) +{ + sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); + if (sock == -1) { + // Oops. Ignore this output, then. + log_perror("socket"); + return; + } + + if (mark_pool != NULL) { + fwmark = mark_pool->get_mark(); + if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { + if (fwmark != 0) { + log_perror("setsockopt(SO_MARK)"); + } + } + } +} + +UDPStream::~UDPStream() +{ + if (sock != -1) { + safe_close(sock); + } + if (mark_pool != NULL) { + mark_pool->release_mark(fwmark); + } +} + +void UDPStream::send(const char *data, size_t bytes) +{ + if (sock == -1) { + return; + } + ssize_t err = sendto(sock, data, bytes, 0, reinterpret_cast(&dst), sizeof(dst)); + if (err == -1) { + log_perror("sendto"); + } +} diff --git a/udpstream.h b/udpstream.h new file mode 100644 index 0000000..3424b98 --- /dev/null +++ b/udpstream.h @@ -0,0 +1,33 @@ +#ifndef _UDPSTREAM_H +#define _UDPSTREAM_H 1 + +// A single UDP destination. This is done a lot less efficient than HTTP streaming +// since we expect to have so few of them, which also means things are a lot simpler. +// In particular, we run in the input's thread, so there is no backlog, which means +// that there is no state (UDP itself is, of course, stateless). + +#include +#include +#include +#include + +class MarkPool; + +class UDPStream { +public: + // can be NULL. Does not take ownership of the mark pool. + UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool); + ~UDPStream(); + + void send(const char *data, size_t bytes); + +private: + UDPStream(const UDPStream& other); + + sockaddr_in6 dst; + int sock; + MarkPool *mark_pool; + int fwmark; +}; + +#endif // !defined(_UDPSTREAM_H)