Support joining multicast addresses (both ASM and SSM).
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 26 Apr 2014 22:41:08 +0000 (00:41 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 26 Apr 2014 22:41:08 +0000 (00:41 +0200)
README
cubemap.1
cubemap.config.sample
httpinput.cpp
input.cpp
input.h
udpinput.cpp
udpinput.h

diff --git a/README b/README
index ecee151..69efbc7 100644 (file)
--- a/README
+++ b/README
@@ -16,6 +16,8 @@ A short list of features:
    (depends on Linux 3.13 or newer).
  - Reflects anything VLC can reflect over HTTP, even the muxes VLC
    has problems reflecting itself (in particular, FLV).
+ - Multicast support, both for sending and receiving (supports only protocols
+   that can go over UDP, e.g. MPEG-TS). Supports both ASM and SSM.
  - IPv4 support. Yes, Cubemap even supports (some) legacy protocols.
 
 
index 0396f46..23872c6 100644 (file)
--- a/cubemap.1
+++ b/cubemap.1
@@ -33,6 +33,9 @@ Support for setting max pacing rate through the fq packet scheduler
 Reflects anything VLC can reflect over HTTP, even the muxes VLC
 has problems reflecting itself (in particular, FLV).
 .IP \[bu]
+Multicast support, both for sending and receiving (supports only protocols
+that can go over UDP, e.g. MPEG-TS). Supports both ASM and SSM.
+.IP \[bu]
 IPv4 support. Yes, Cubemap even supports (some) legacy protocols.
 .SH OPTIONS
 .TP
index 6378642..b825947 100644 (file)
@@ -35,7 +35,8 @@ error_log type=console
 #
 stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv
 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
-stream /udp.ts src=udp://@:1234 backlog_size=1048576 pacing_rate_kbit=2000
+stream /udp.ts src=udp://@:1234 backlog_size=1048576
+stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000
 udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
 udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube
 udpstream 233.252.0.1:5002 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube ttl=32 multicast_output_interface=eth1
index 478356e..e2cc197 100644 (file)
@@ -76,8 +76,8 @@ HTTPInput::HTTPInput(const InputProto &serialized)
        pending_data.resize(serialized.pending_data().size());
        memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size());
 
-       string protocol;
-       parse_url(url, &protocol, &host, &port, &path);  // Don't care if it fails.
+       string protocol, user;
+       parse_url(url, &protocol, &user, &host, &port, &path);  // Don't care if it fails.
 
        pthread_mutex_init(&stats_mutex, NULL);
        stats.url = url;
@@ -340,8 +340,8 @@ void HTTPInput::do_work()
                        }
 
                        {
-                               string protocol;  // Thrown away.
-                               if (!parse_url(url, &protocol, &host, &port, &path)) {
+                               string protocol, user;  // Thrown away.
+                               if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                                        log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str());
                                        break;
                                }
index d37b7d9..0ee7d1c 100644 (file)
--- a/input.cpp
+++ b/input.cpp
@@ -8,8 +8,27 @@
 
 using namespace std;
 
+namespace {
+
+// Does not support passwords, only user:host, since this is really only used
+// to parse VLC's udp://source@multicastgroup:1234/ syntax (we do not support
+// even basic auth).
+void split_user_host(const string &user_host, string *user, string *host)
+{
+       size_t split = user_host.find("@");
+       if (split == string::npos) {
+               user->clear();
+               *host = user_host;
+       } else {
+               *user = string(user_host.begin(), user_host.begin() + split);
+               *host = string(user_host.begin() + split + 1, user_host.end());
+       }
+}
+
+}  // namespace
+
 // Extremely rudimentary URL parsing.
-bool parse_url(const string &url, string *protocol, string *host, string *port, string *path)
+bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path)
 {
        size_t split = url.find("://");
        if (split == string::npos) {
@@ -21,13 +40,13 @@ bool parse_url(const string &url, string *protocol, string *host, string *port,
        split = rest.find_first_of(":/");
        if (split == string::npos) {
                // http://foo
-               *host = rest;
+               split_user_host(rest, user, host);
                *port = *protocol;
                *path = "/";
                return true;
        }
 
-       *host = string(rest.begin(), rest.begin() + split);
+       split_user_host(string(rest.begin(), rest.begin() + split), user, host);
        char ch = rest[split];  // Colon or slash.
        rest = string(rest.begin() + split + 1, rest.end());
 
@@ -55,8 +74,8 @@ bool parse_url(const string &url, string *protocol, string *host, string *port,
 
 Input *create_input(const std::string &url)
 {
-       string protocol, host, port, path;
-       if (!parse_url(url, &protocol, &host, &port, &path)) {
+       string protocol, user, host, port, path;
+       if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                return NULL;
        }
        if (protocol == "http") {
@@ -70,8 +89,8 @@ Input *create_input(const std::string &url)
 
 Input *create_input(const InputProto &serialized)
 {
-       string protocol, host, port, path;
-       if (!parse_url(serialized.url(), &protocol, &host, &port, &path)) {
+       string protocol, user, host, port, path;
+       if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) {
                return NULL;
        }
        if (protocol == "http") {
diff --git a/input.h b/input.h
index 5b3bbd2..1cea2f5 100644 (file)
--- a/input.h
+++ b/input.h
@@ -11,7 +11,7 @@ class Input;
 class InputProto;
 
 // Extremely rudimentary URL parsing.
-bool parse_url(const std::string &url, std::string *protocol, std::string *host, std::string *port, std::string *path);
+bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path);
 
 // Figure out the right type of input based on the URL, and create a new Input of the right type.
 // Will return NULL if unknown.
index 6b17799..5e39d66 100644 (file)
@@ -22,13 +22,91 @@ using namespace std;
 
 extern ServerPool *servers;
 
+namespace {
+
+// Similar to parse_hostport(), but only parses the IP address,
+// and does not use mapped-v4 addresses, since multicast seems
+// to not like that too much.
+bool parse_ip_address(const string &ip, sockaddr_storage *addr)
+{
+       memset(addr, 0, sizeof(*addr));
+
+       assert(!ip.empty());
+       if (ip[0] == '[') {
+               sockaddr_in6 *addr6 = (sockaddr_in6 *)addr;
+               addr6->sin6_family = AF_INET6;
+               if (ip[ip.size() - 1] != ']') {
+                       log(ERROR, "address '%s' is malformed; must be either [ipv6addr] or ipv4addr",
+                               ip.c_str());
+                       return false;
+               }
+               if (inet_pton(AF_INET6, ip.c_str(), &addr6->sin6_addr) != 1) {
+                       log(ERROR, "'%s' is not a valid IPv6 address");
+                       return false;
+               }
+       } else {
+               sockaddr_in *addr4 = (sockaddr_in *)addr;
+               addr4->sin_family = AF_INET;
+               if (inet_pton(AF_INET, ip.c_str(), &addr4->sin_addr) != 1) {
+                       log(ERROR, "'%s' is not a valid IPv4 address");
+                       return false;
+               }
+       }
+
+       return true;
+}
+
+bool maybe_join_multicast_group(int sock, const string &group, const string &source)
+{
+       if (group.empty()) {
+               // Not multicast.
+               return true;
+       }
+
+       // Join the given multicast group (ASM or SSM).
+       // TODO: Also support sources apart from multicast groups,
+       // e.g. udp://[::1]:1234 for only receiving from localhost.
+       if (!source.empty()) {
+               // Single-Source Multicast (SSM).
+               group_source_req gsr;
+               memset(&gsr, 0, sizeof(gsr));
+               if (!parse_ip_address(group, &gsr.gsr_group)) {
+                       return false;
+               }
+               if (!parse_ip_address(source, &gsr.gsr_source)) {
+                       return false;
+               }
+               int level = (gsr.gsr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6;
+               if (setsockopt(sock, level, MCAST_JOIN_SOURCE_GROUP, &gsr, sizeof(gsr)) == -1) {
+                       log_perror("setsockopt(MCAST_JOIN_SOURCE_GROUP)");
+                       return false;
+               }
+       } else {
+               // Any-Source Multicast (ASM).
+               group_req gr;
+               memset(&gr, 0, sizeof(gr));
+               if (!parse_ip_address(group, &gr.gr_group)) {
+                       return false;
+               }
+               int level = (gr.gr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6;
+               if (setsockopt(sock, level, MCAST_JOIN_GROUP, &gr, sizeof(gr)) == -1) {
+                       log_perror("setsockopt(MCAST_JOIN_GROUP)");
+                       return false;
+               }
+       }
+
+       return true;
+}
+
+}  // namespace
+
 UDPInput::UDPInput(const string &url)
        : url(url),
          sock(-1)
 {
        // Should be verified by the caller.
        string protocol;
-       bool ok = parse_url(url, &protocol, &host, &port, &path);
+       bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
        assert(ok);
 
        construct_header();
@@ -46,7 +124,7 @@ UDPInput::UDPInput(const InputProto &serialized)
 {
        // Should be verified by the caller.
        string protocol;
-       bool ok = parse_url(url, &protocol, &host, &port, &path);
+       bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
        assert(ok);
 
        construct_header();
@@ -108,6 +186,18 @@ void UDPInput::do_work()
                                usleep(200000);
                                continue;
                        }
+
+                       // The syntax udp://source@group (abusing the username field
+                       // to store the sender in SSM) seems to be a VLC invention.
+                       // We mimic it.
+                       if (!maybe_join_multicast_group(sock, host, user)) {
+                               log(WARNING, "[%s] Multicast join failed. Waiting 0.2 seconds and trying again...",
+                                            url.c_str());
+                               safe_close(sock);
+                               sock = -1;
+                               usleep(200000);
+                               continue;
+                       }
                }
 
                // Wait for a packet, or a wakeup.
index 11a3690..b3f504a 100644 (file)
@@ -35,7 +35,7 @@ private:
 
        // The URL and its parsed components.
        std::string url;
-       std::string host, port, path;
+       std::string user, host, port, path;
 
        // The HTTP header we're sending to clients.
        std::string http_header;