From: Steinar H. Gunderson Date: Sat, 26 Apr 2014 22:41:08 +0000 (+0200) Subject: Support joining multicast addresses (both ASM and SSM). X-Git-Tag: 1.1.0~3 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=70c47a998c5aa2eb536c3c8f71f3178cd217a14d Support joining multicast addresses (both ASM and SSM). --- diff --git a/README b/README index ecee151..69efbc7 100644 --- a/README +++ b/README @@ -16,6 +16,8 @@ A short list of features: (depends on Linux 3.13 or newer). - Reflects anything VLC can reflect over HTTP, even the muxes VLC has problems reflecting itself (in particular, FLV). + - Multicast support, both for sending and receiving (supports only protocols + that can go over UDP, e.g. MPEG-TS). Supports both ASM and SSM. - IPv4 support. Yes, Cubemap even supports (some) legacy protocols. diff --git a/cubemap.1 b/cubemap.1 index 0396f46..23872c6 100644 --- a/cubemap.1 +++ b/cubemap.1 @@ -33,6 +33,9 @@ Support for setting max pacing rate through the fq packet scheduler Reflects anything VLC can reflect over HTTP, even the muxes VLC has problems reflecting itself (in particular, FLV). .IP \[bu] +Multicast support, both for sending and receiving (supports only protocols +that can go over UDP, e.g. MPEG-TS). Supports both ASM and SSM. +.IP \[bu] IPv4 support. Yes, Cubemap even supports (some) legacy protocols. .SH OPTIONS .TP diff --git a/cubemap.config.sample b/cubemap.config.sample index 6378642..b825947 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -35,7 +35,8 @@ error_log type=console # stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube -stream /udp.ts src=udp://@:1234 backlog_size=1048576 pacing_rate_kbit=2000 +stream /udp.ts src=udp://@:1234 backlog_size=1048576 +stream /udp-multicast.ts src=udp://@233.252.0.2:1234 pacing_rate_kbit=2000 udpstream [2001:67c:29f4::50]:5000 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube udpstream 233.252.0.1:5002 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube ttl=32 multicast_output_interface=eth1 diff --git a/httpinput.cpp b/httpinput.cpp index 478356e..e2cc197 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -76,8 +76,8 @@ HTTPInput::HTTPInput(const InputProto &serialized) pending_data.resize(serialized.pending_data().size()); memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); - string protocol; - parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails. + string protocol, user; + parse_url(url, &protocol, &user, &host, &port, &path); // Don't care if it fails. pthread_mutex_init(&stats_mutex, NULL); stats.url = url; @@ -340,8 +340,8 @@ void HTTPInput::do_work() } { - string protocol; // Thrown away. - if (!parse_url(url, &protocol, &host, &port, &path)) { + string protocol, user; // Thrown away. + if (!parse_url(url, &protocol, &user, &host, &port, &path)) { log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str()); break; } diff --git a/input.cpp b/input.cpp index d37b7d9..0ee7d1c 100644 --- a/input.cpp +++ b/input.cpp @@ -8,8 +8,27 @@ using namespace std; +namespace { + +// Does not support passwords, only user:host, since this is really only used +// to parse VLC's udp://source@multicastgroup:1234/ syntax (we do not support +// even basic auth). +void split_user_host(const string &user_host, string *user, string *host) +{ + size_t split = user_host.find("@"); + if (split == string::npos) { + user->clear(); + *host = user_host; + } else { + *user = string(user_host.begin(), user_host.begin() + split); + *host = string(user_host.begin() + split + 1, user_host.end()); + } +} + +} // namespace + // Extremely rudimentary URL parsing. -bool parse_url(const string &url, string *protocol, string *host, string *port, string *path) +bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path) { size_t split = url.find("://"); if (split == string::npos) { @@ -21,13 +40,13 @@ bool parse_url(const string &url, string *protocol, string *host, string *port, split = rest.find_first_of(":/"); if (split == string::npos) { // http://foo - *host = rest; + split_user_host(rest, user, host); *port = *protocol; *path = "/"; return true; } - *host = string(rest.begin(), rest.begin() + split); + split_user_host(string(rest.begin(), rest.begin() + split), user, host); char ch = rest[split]; // Colon or slash. rest = string(rest.begin() + split + 1, rest.end()); @@ -55,8 +74,8 @@ bool parse_url(const string &url, string *protocol, string *host, string *port, Input *create_input(const std::string &url) { - string protocol, host, port, path; - if (!parse_url(url, &protocol, &host, &port, &path)) { + string protocol, user, host, port, path; + if (!parse_url(url, &protocol, &user, &host, &port, &path)) { return NULL; } if (protocol == "http") { @@ -70,8 +89,8 @@ Input *create_input(const std::string &url) Input *create_input(const InputProto &serialized) { - string protocol, host, port, path; - if (!parse_url(serialized.url(), &protocol, &host, &port, &path)) { + string protocol, user, host, port, path; + if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) { return NULL; } if (protocol == "http") { diff --git a/input.h b/input.h index 5b3bbd2..1cea2f5 100644 --- a/input.h +++ b/input.h @@ -11,7 +11,7 @@ class Input; class InputProto; // Extremely rudimentary URL parsing. -bool parse_url(const std::string &url, std::string *protocol, std::string *host, std::string *port, std::string *path); +bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path); // Figure out the right type of input based on the URL, and create a new Input of the right type. // Will return NULL if unknown. diff --git a/udpinput.cpp b/udpinput.cpp index 6b17799..5e39d66 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -22,13 +22,91 @@ using namespace std; extern ServerPool *servers; +namespace { + +// Similar to parse_hostport(), but only parses the IP address, +// and does not use mapped-v4 addresses, since multicast seems +// to not like that too much. +bool parse_ip_address(const string &ip, sockaddr_storage *addr) +{ + memset(addr, 0, sizeof(*addr)); + + assert(!ip.empty()); + if (ip[0] == '[') { + sockaddr_in6 *addr6 = (sockaddr_in6 *)addr; + addr6->sin6_family = AF_INET6; + if (ip[ip.size() - 1] != ']') { + log(ERROR, "address '%s' is malformed; must be either [ipv6addr] or ipv4addr", + ip.c_str()); + return false; + } + if (inet_pton(AF_INET6, ip.c_str(), &addr6->sin6_addr) != 1) { + log(ERROR, "'%s' is not a valid IPv6 address"); + return false; + } + } else { + sockaddr_in *addr4 = (sockaddr_in *)addr; + addr4->sin_family = AF_INET; + if (inet_pton(AF_INET, ip.c_str(), &addr4->sin_addr) != 1) { + log(ERROR, "'%s' is not a valid IPv4 address"); + return false; + } + } + + return true; +} + +bool maybe_join_multicast_group(int sock, const string &group, const string &source) +{ + if (group.empty()) { + // Not multicast. + return true; + } + + // Join the given multicast group (ASM or SSM). + // TODO: Also support sources apart from multicast groups, + // e.g. udp://[::1]:1234 for only receiving from localhost. + if (!source.empty()) { + // Single-Source Multicast (SSM). + group_source_req gsr; + memset(&gsr, 0, sizeof(gsr)); + if (!parse_ip_address(group, &gsr.gsr_group)) { + return false; + } + if (!parse_ip_address(source, &gsr.gsr_source)) { + return false; + } + int level = (gsr.gsr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6; + if (setsockopt(sock, level, MCAST_JOIN_SOURCE_GROUP, &gsr, sizeof(gsr)) == -1) { + log_perror("setsockopt(MCAST_JOIN_SOURCE_GROUP)"); + return false; + } + } else { + // Any-Source Multicast (ASM). + group_req gr; + memset(&gr, 0, sizeof(gr)); + if (!parse_ip_address(group, &gr.gr_group)) { + return false; + } + int level = (gr.gr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6; + if (setsockopt(sock, level, MCAST_JOIN_GROUP, &gr, sizeof(gr)) == -1) { + log_perror("setsockopt(MCAST_JOIN_GROUP)"); + return false; + } + } + + return true; +} + +} // namespace + UDPInput::UDPInput(const string &url) : url(url), sock(-1) { // Should be verified by the caller. string protocol; - bool ok = parse_url(url, &protocol, &host, &port, &path); + bool ok = parse_url(url, &protocol, &user, &host, &port, &path); assert(ok); construct_header(); @@ -46,7 +124,7 @@ UDPInput::UDPInput(const InputProto &serialized) { // Should be verified by the caller. string protocol; - bool ok = parse_url(url, &protocol, &host, &port, &path); + bool ok = parse_url(url, &protocol, &user, &host, &port, &path); assert(ok); construct_header(); @@ -108,6 +186,18 @@ void UDPInput::do_work() usleep(200000); continue; } + + // The syntax udp://source@group (abusing the username field + // to store the sender in SSM) seems to be a VLC invention. + // We mimic it. + if (!maybe_join_multicast_group(sock, host, user)) { + log(WARNING, "[%s] Multicast join failed. Waiting 0.2 seconds and trying again...", + url.c_str()); + safe_close(sock); + sock = -1; + usleep(200000); + continue; + } } // Wait for a packet, or a wakeup. diff --git a/udpinput.h b/udpinput.h index 11a3690..b3f504a 100644 --- a/udpinput.h +++ b/udpinput.h @@ -35,7 +35,7 @@ private: // The URL and its parsed components. std::string url; - std::string host, port, path; + std::string user, host, port, path; // The HTTP header we're sending to clients. std::string http_header;