From e3f2936e3c9ff3b5569759c1aaed16f03bf728f8 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Wed, 5 May 2021 20:40:39 +0200 Subject: [PATCH] Support input from pipes (subprocesses). With a program that can output Metacube on stdout, such as a suitably patched FFmpeg binary, this gives Cubemap transcoding/remuxing capabilities. Of course, one cannot upgrade such a binary by SIGHUP-ing it, but it will survive a Cubemap reload/restart. --- httpinput.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++++++----- httpinput.h | 6 ++++ input.cpp | 11 +++++-- 3 files changed, 92 insertions(+), 10 deletions(-) diff --git a/httpinput.cpp b/httpinput.cpp index d0e1c5c..274cc26 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -11,9 +11,12 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include @@ -220,6 +223,52 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) freeaddrinfo(base_ai); return -1; } + +int HTTPInput::open_child_process(const string &cmdline) +{ + int devnullfd = open("/dev/null", O_RDONLY | O_CLOEXEC); + if (devnullfd == -1) { + log_perror("/dev/null"); + return -1; + } + + int pipefd[2]; + if (pipe2(pipefd, O_CLOEXEC) == -1) { + log_perror("pipe2()"); + close(devnullfd); + return -1; + } + + // Point stdout to us, stdin to /dev/null, and stderr remains where it is + // (probably the systemd log). All other file descriptors should be marked + // as close-on-exec, and should thus not leak into the child. + posix_spawn_file_actions_t actions; + posix_spawn_file_actions_init(&actions); + posix_spawn_file_actions_adddup2(&actions, devnullfd, 0); + posix_spawn_file_actions_adddup2(&actions, pipefd[1], 1); + + pid_t child_pid; + char * const argv[] = { + strdup("/bin/sh"), + strdup("-c"), + strdup(path.c_str()), + nullptr + }; + int err = posix_spawn(&child_pid, "/bin/sh", &actions, /*attrp=*/nullptr, argv, /*envp=*/nullptr); + posix_spawn_file_actions_destroy(&actions); + free(argv[0]); + free(argv[1]); + free(argv[2]); + close(pipefd[1]); + + if (err == 0) { + return pipefd[0]; + } else { + log_perror(cmdline.c_str()); + close(pipefd[0]); + return -1; + } +} bool HTTPInput::parse_response(const string &request) { @@ -332,7 +381,11 @@ void HTTPInput::do_work() } switch (state) { - case NOT_CONNECTED: + case NOT_CONNECTED: { + // Reap any exited children. + int wstatus; + while (waitpid(-1, &wstatus, WNOHANG) != -1 || errno == EINTR) ; + request.clear(); request_bytes_sent = 0; response.clear(); @@ -343,8 +396,9 @@ void HTTPInput::do_work() servers->set_unavailable(stream_index); } + string protocol; { - string protocol, user; // Thrown away. + string user; // Thrown away. if (!parse_url(url, &protocol, &user, &host, &port, &path)) { if (!suppress_logging) { log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str()); @@ -378,18 +432,33 @@ void HTTPInput::do_work() assert(err != -1); } ++num_connection_attempts; - sock = lookup_and_connect(host, port); + if (protocol == "pipe") { + sock = open_child_process(path.c_str()); + + if (sock != -1) { + // Construct a minimal HTTP header. + http_header = "HTTP/1.0 200 OK\r\n"; + for (int stream_index : stream_indices) { + servers->set_header(stream_index, http_header, stream_header); + } + state = RECEIVING_DATA; + } + } else { + sock = lookup_and_connect(host, port); + if (sock != -1) { + // Yay, successful connect. + state = SENDING_REQUEST; + request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n"; + request_bytes_sent = 0; + } + } if (sock != -1) { - // Yay, successful connect. - state = SENDING_REQUEST; - request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n"; - request_bytes_sent = 0; - lock_guard lock(stats_mutex); stats.connect_time = time(nullptr); clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity); } break; + } case SENDING_REQUEST: { size_t to_send = request.size() - request_bytes_sent; int ret; diff --git a/httpinput.h b/httpinput.h index 4ee5ede..890cc9e 100644 --- a/httpinput.h +++ b/httpinput.h @@ -12,6 +12,8 @@ class InputProto; +// Despite the name, covers input over both HTTP and pipes, both typically +// wrapped in Metacube. class HTTPInput : public Input { public: HTTPInput(const std::string &url, Input::Encoding encoding); @@ -35,6 +37,10 @@ private: // Open a socket that connects to the given host and port. Does DNS resolving. int lookup_and_connect(const std::string &host, const std::string &port); + // Open a child process with the given command line (given to /bin/sh). + // Returns a pipe to its standard output. + int open_child_process(const std::string &cmdline); + // Parses a HTTP response. Returns false if it not a 200. bool parse_response(const std::string &response); diff --git a/input.cpp b/input.cpp index dba6cf3..81bc2dc 100644 --- a/input.cpp +++ b/input.cpp @@ -30,6 +30,13 @@ void split_user_host(const string &user_host, string *user, string *host) // Extremely rudimentary URL parsing. bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path) { + // pipe:foo (or pipe:"foo"). + if (url.find("pipe:") == 0) { + *protocol = "pipe"; + *path = string(url.begin() + 5, url.end()); + return true; + } + size_t split = url.find("://"); if (split == string::npos) { return false; @@ -100,7 +107,7 @@ Input *create_input(const string &url, Input::Encoding encoding) if (!parse_url(url, &protocol, &user, &host, &port, &path)) { return nullptr; } - if (protocol == "http") { + if (protocol == "http" || protocol == "pipe") { return new HTTPInput(url, encoding); } if (protocol == "udp") { @@ -116,7 +123,7 @@ Input *create_input(const InputProto &serialized) if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) { return nullptr; } - if (protocol == "http") { + if (protocol == "http" || protocol == "pipe") { return new HTTPInput(serialized); } if (protocol == "udp") { -- 2.39.2