Add support for UDP outputs.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 21 Apr 2013 11:55:23 +0000 (13:55 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 21 Apr 2013 11:55:23 +0000 (13:55 +0200)
Makefile
config.cpp
config.h
cubemap.config.sample
main.cpp
serverpool.cpp
serverpool.h
udpstream.cpp [new file with mode: 0644]
udpstream.h [new file with mode: 0644]

index 20f4010..218285e 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ PROTOC=protoc
 CXXFLAGS=-Wall -O2 -g -pthread
 LDLIBS=-lprotobuf -pthread
 
-OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
+OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
 
 all: cubemap
 
index 4050338..d47bfec 100644 (file)
@@ -235,6 +235,87 @@ bool parse_stream(const ConfigLine &line, Config *config)
        return true;
 }
 
+bool parse_udpstream(const ConfigLine &line, Config *config)
+{
+       if (line.arguments.size() != 1) {
+               log(ERROR, "'udpstream' takes exactly one argument");
+               return false;
+       }
+
+       UDPStreamConfig udpstream;
+
+       string hostport = line.arguments[0];
+
+       // See if the argument if on the type [ipv6addr]:port.
+       if (!hostport.empty() && hostport[0] == '[') {
+               size_t split = hostport.find("]:");
+               if (split == string::npos) {
+                       log(ERROR, "udpstream destination '%s' is malformed; must be either [ipv6addr]:port or ipv4addr:port");
+                       return false;
+               }
+
+               string host(hostport.begin() + 1, hostport.begin() + split);
+               string port = hostport.substr(split + 2);
+
+               udpstream.dst.sin6_family = AF_INET6;
+               if (inet_pton(AF_INET6, host.c_str(), &udpstream.dst.sin6_addr) != 1) {
+                       log(ERROR, "udpstream destination host '%s' is not a valid IPv6 address");
+                       return false;
+               }
+
+               udpstream.dst.sin6_port = htons(atoi(port.c_str()));  // TODO: Verify validity.
+       } else {
+               // OK, then it must be ipv4addr:port.
+               size_t split = hostport.find(":");
+               if (split == string::npos) {
+                       log(ERROR, "udpstream destination '%s' is malformed; must be either [ipv6addr]:port or ipv4addr:port");
+                       return false;
+               }
+
+               string host(hostport.begin(), hostport.begin() + split);
+               string port = hostport.substr(split + 1);
+
+               // Parse to an IPv4 address, then construct a mapped-v4 address from that.
+               in_addr addr4;
+
+               if (inet_pton(AF_INET, host.c_str(), &addr4) != 1) {
+                       log(ERROR, "udpstream destination host '%s' is not a valid IPv4 address");
+                       return false;
+               }
+
+               udpstream.dst.sin6_family = AF_INET6;
+               udpstream.dst.sin6_addr.s6_addr32[0] = 0;
+               udpstream.dst.sin6_addr.s6_addr32[1] = 0;
+               udpstream.dst.sin6_addr.s6_addr32[2] = htonl(0xffff);
+               udpstream.dst.sin6_addr.s6_addr32[3] = addr4.s_addr;
+               udpstream.dst.sin6_port = htons(atoi(port.c_str()));  // TODO: Verify validity.
+       }
+
+       map<string, string>::const_iterator src_it = line.parameters.find("src");
+       if (src_it == line.parameters.end()) {
+               // This is pretty meaningless, but OK, consistency is good.
+               log(WARNING, "udpstream to %s has no src= attribute, clients will not get any data.",
+                       hostport.c_str());
+       } else {
+               udpstream.src = src_it->second;
+               // TODO: Verify that the URL is parseable?
+       }
+
+       // Parse marks, if so desired.
+       map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
+       if (mark_parm_it == line.parameters.end()) {
+               udpstream.mark_pool = -1;
+       } else {
+               int from, to;
+               if (!parse_mark_pool(mark_parm_it->second, &from, &to)) {
+                       return false;
+               }
+               udpstream.mark_pool = allocate_mark_pool(from, to, config);
+       }
+
+       config->udpstreams.push_back(udpstream);
+       return true;
+}
 bool parse_error_log(const ConfigLine &line, Config *config)
 {
        if (line.arguments.size() != 0) {
@@ -317,6 +398,10 @@ bool parse_config(const string &filename, Config *config)
                        if (!parse_stream(line, config)) {
                                return false;
                        }
+               } else if (line.keyword == "udpstream") {
+                       if (!parse_udpstream(line, config)) {
+                               return false;
+                       }
                } else if (line.keyword == "error_log") {
                        if (!parse_error_log(line, config)) {
                                return false;
index d79d3d7..8de6704 100644 (file)
--- a/config.h
+++ b/config.h
@@ -3,6 +3,7 @@
 
 // Various routines that deal with parsing the configuration file.
 
+#include <arpa/inet.h>
 #include <stddef.h>
 #include <string>
 #include <vector>
@@ -19,6 +20,12 @@ struct StreamConfig {
        enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
 };
 
+struct UDPStreamConfig {
+       sockaddr_in6 dst;
+       std::string src;  // Can be empty.
+       int mark_pool;  // -1 for none.
+};
+
 struct AcceptorConfig {
        int port;
 };
@@ -33,6 +40,7 @@ struct Config {
        int num_servers;
        std::vector<MarkPoolConfig> mark_pools;
        std::vector<StreamConfig> streams;
+       std::vector<UDPStreamConfig> udpstreams;
        std::vector<AcceptorConfig> acceptors;
        std::vector<LogConfig> log_destinations;
 
index f3e2e0c..9b104d8 100644 (file)
@@ -28,3 +28,5 @@ error_log type=console
 stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
 stream /udp.ts src=udp://@:1234 backlog_size=1048576
+udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
+udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
index 5ec0cb4..8d60826 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -109,29 +109,36 @@ vector<Acceptor *> create_acceptors(
        return acceptors;
 }
 
+void create_config_input(const string &src, multimap<string, InputWithRefcount> *inputs)
+{
+       if (src.empty()) {
+               return;
+       }
+       if (inputs->count(src) != 0) {
+               return;
+       }
+
+       InputWithRefcount iwr;
+       iwr.input = create_input(src);
+       if (iwr.input == NULL) {
+               log(ERROR, "did not understand URL '%s', clients will not get any data.",
+                       src.c_str());
+               return;
+       }
+       iwr.refcount = 0;
+       inputs->insert(make_pair(src, iwr));
+}
+
 // Find all streams in the configuration file, and create inputs for them.
 void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
 {
        for (unsigned i = 0; i < config.streams.size(); ++i) {
                const StreamConfig &stream_config = config.streams[i];
-               if (stream_config.src.empty()) {
-                       continue;
-               }
-
-               string src = stream_config.src;
-               if (inputs->count(src) != 0) {
-                       continue;
-               }
-
-               InputWithRefcount iwr;
-               iwr.input = create_input(src);
-               if (iwr.input == NULL) {
-                       log(ERROR, "did not understand URL '%s', clients will not get any data.",
-                               src.c_str());
-                       continue;
-               }
-               iwr.refcount = 0;
-               inputs->insert(make_pair(src, iwr));
+               create_config_input(stream_config.src, inputs);
+       }
+       for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
+               const UDPStreamConfig &udpstream_config = config.udpstreams[i];
+               create_config_input(udpstream_config.src, inputs);
        }
 }
 
@@ -144,6 +151,7 @@ void create_streams(const Config &config,
                mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
        }
 
+       // HTTP streams.
        set<string> expecting_urls = deserialized_urls;
        for (unsigned i = 0; i < config.streams.size(); ++i) {
                const StreamConfig &stream_config = config.streams[i];
@@ -174,7 +182,7 @@ void create_streams(const Config &config,
                }
        }
 
-       // Warn about any servers we've lost.
+       // Warn about any HTTP servers we've lost.
        // TODO: Make an option (delete=yes?) to actually shut down streams.
        for (set<string>::const_iterator stream_it = expecting_urls.begin();
             stream_it != expecting_urls.end();
@@ -184,6 +192,24 @@ void create_streams(const Config &config,
                             "It will not be deleted, but clients will not get any new inputs.",
                             url.c_str());
        }
+
+       // UDP streams.
+       for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
+               const UDPStreamConfig &udpstream_config = config.udpstreams[i];
+               MarkPool *mark_pool = NULL;
+               if (udpstream_config.mark_pool != -1) {
+                       mark_pool = mark_pools[udpstream_config.mark_pool];
+               }
+               int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool);
+
+               string src = udpstream_config.src;
+               if (!src.empty()) {
+                       multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+                       assert(input_it != inputs->end());
+                       input_it->second.input->add_destination(stream_index);
+                       ++input_it->second.refcount;
+               }
+       }
 }
        
 void open_logs(const vector<LogConfig> &log_destinations)
index fb8ca45..f8e64aa 100644 (file)
@@ -16,13 +16,18 @@ using namespace std;
 ServerPool::ServerPool(int size)
        : servers(new Server[size]),
          num_servers(size),
-         clients_added(0)
+         clients_added(0),
+         num_http_streams(0)
 {
 }
 
 ServerPool::~ServerPool()
 {
        delete[] servers;
+
+       for (size_t i = 0; i < udp_streams.size(); ++i) {
+               delete udp_streams[i];
+       }
 }
        
 CubemapStateProto ServerPool::serialize()
@@ -69,24 +74,25 @@ int ServerPool::lookup_stream_by_url(const std::string &url) const
 
 int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
 {
-       int stream_index = -1;
+       // Adding more HTTP streams after UDP streams would cause the UDP stream
+       // indices to move around, which is obviously not good.
+       assert(udp_streams.empty());
+
        for (int i = 0; i < num_servers; ++i) {
-               int stream_index2 = servers[i].add_stream(url, backlog_size, encoding);
-               if (i == 0) {
-                       stream_index = stream_index2;
-               } else {
-                       // Verify that all servers have this under the same stream index.
-                       assert(stream_index == stream_index2);
-               }
+               int stream_index = servers[i].add_stream(url, backlog_size, encoding);
+               assert(stream_index == num_http_streams);
        }
-       return stream_index;
+       return num_http_streams++;
 }
 
 int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
 {
+       // Adding more HTTP streams after UDP streams would cause the UDP stream
+       // indices to move around, which is obviously not good.
+       assert(udp_streams.empty());
+
        assert(!data_fds.empty());
        string contents;
-       int stream_index = -1;
        for (int i = 0; i < num_servers; ++i) {
                int data_fd;
                if (i < int(data_fds.size())) {
@@ -102,13 +108,8 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect
                        data_fd = make_tempfile(contents);
                }
 
-               int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd);
-               if (i == 0) {
-                       stream_index = stream_index2;
-               } else {
-                       // Verify that all servers have this under the same stream index.
-                       assert(stream_index == stream_index2);
-               }
+               int stream_index = servers[i].add_stream_from_serialized(stream, data_fd);
+               assert(stream_index == num_http_streams);
        }
 
        // Close and delete any leftovers, if the number of servers was reduced.
@@ -116,11 +117,30 @@ int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vect
                safe_close(data_fds[i]);  // Implicitly deletes the file.
        }
 
-       return stream_index;
+       return num_http_streams++;
+}
+       
+int ServerPool::add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+{
+       udp_streams.push_back(new UDPStream(dst, mark_pool));
+       return num_http_streams + udp_streams.size() - 1;
 }
 
 void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header)
 {
+       assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
+
+       if (stream_index >= num_http_streams) {
+               // UDP stream. TODO: Log which stream this is.
+               if (!stream_header.empty()) {
+                       log(WARNING, "Trying to send stream format with headers to a UDP destination. This is unlikely to work well.");
+               }
+
+               // Ignore the HTTP header.
+               return;
+       }
+
+       // HTTP stream.
        for (int i = 0; i < num_servers; ++i) {
                servers[i].set_header(stream_index, http_header, stream_header);
        }
@@ -128,6 +148,15 @@ void ServerPool::set_header(int stream_index, const string &http_header, const s
 
 void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
 {
+       assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
+
+       if (stream_index >= num_http_streams) {
+               // UDP stream.
+               udp_streams[stream_index - num_http_streams]->send(data, bytes);
+               return;
+       }
+
+       // HTTP stream.
        for (int i = 0; i < num_servers; ++i) {
                servers[i].add_data_deferred(stream_index, data, bytes);
        }
index 5f5f6f8..5b7fca3 100644 (file)
@@ -8,6 +8,7 @@
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
+#include "udpstream.h"
 
 class MarkPool;
 class Server;
@@ -29,6 +30,7 @@ public:
        // Adds the given stream to all the servers. Returns the stream index.
        int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
        int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
+       int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool);
 
        // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
        int lookup_stream_by_url(const std::string &url) const;
@@ -60,6 +62,13 @@ private:
        Server *servers;
        int num_servers, clients_added;
 
+       // Our indexing is currently rather primitive; every stream_index in
+       // [0, num_http_streams) maps to a HTTP stream (of which every Server
+       // has exactly one copy), and after that, it's mapping directly into
+       // <udp_streams>.
+       int num_http_streams;
+       std::vector<UDPStream *> udp_streams;
+
        ServerPool(const ServerPool &);
 };
 
diff --git a/udpstream.cpp b/udpstream.cpp
new file mode 100644 (file)
index 0000000..1852243
--- /dev/null
@@ -0,0 +1,47 @@
+#include "log.h"
+#include "markpool.h"
+#include "udpstream.h"
+#include "util.h"
+
+UDPStream::UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool)
+       : dst(dst),
+         mark_pool(mark_pool),
+         fwmark(0)
+{
+       sock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
+       if (sock == -1) {
+               // Oops. Ignore this output, then.
+               log_perror("socket");
+               return;
+       }
+
+       if (mark_pool != NULL) {
+               fwmark = mark_pool->get_mark();
+               if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) {                          
+                       if (fwmark != 0) {
+                               log_perror("setsockopt(SO_MARK)");
+                       }
+               }
+       }
+}
+
+UDPStream::~UDPStream()
+{
+       if (sock != -1) {
+               safe_close(sock);
+       }
+       if (mark_pool != NULL) {
+               mark_pool->release_mark(fwmark);
+       }
+}
+
+void UDPStream::send(const char *data, size_t bytes)
+{
+       if (sock == -1) {
+               return;
+       }
+       ssize_t err = sendto(sock, data, bytes, 0, reinterpret_cast<sockaddr *>(&dst), sizeof(dst));
+       if (err == -1) {
+               log_perror("sendto");
+       }
+}
diff --git a/udpstream.h b/udpstream.h
new file mode 100644 (file)
index 0000000..3424b98
--- /dev/null
@@ -0,0 +1,33 @@
+#ifndef _UDPSTREAM_H
+#define _UDPSTREAM_H 1
+
+// A single UDP destination. This is done a lot less efficient than HTTP streaming
+// since we expect to have so few of them, which also means things are a lot simpler.
+// In particular, we run in the input's thread, so there is no backlog, which means
+// that there is no state (UDP itself is, of course, stateless).
+
+#include <arpa/inet.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+
+class MarkPool;
+
+class UDPStream {
+public:
+       // <mark_pool> can be NULL. Does not take ownership of the mark pool.
+       UDPStream(const sockaddr_in6 &dst, MarkPool *mark_pool);
+       ~UDPStream();
+
+       void send(const char *data, size_t bytes);
+
+private:
+       UDPStream(const UDPStream& other);
+
+       sockaddr_in6 dst;
+       int sock;
+       MarkPool *mark_pool;
+       int fwmark;
+};
+
+#endif  // !defined(_UDPSTREAM_H)