Support input from pipes (subprocesses).
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 5 May 2021 18:40:39 +0000 (20:40 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 5 May 2021 18:40:39 +0000 (20:40 +0200)
With a program that can output Metacube on stdout, such as a suitably
patched FFmpeg binary, this gives Cubemap transcoding/remuxing capabilities.

Of course, one cannot upgrade such a binary by SIGHUP-ing it, but it will
survive a Cubemap reload/restart.

httpinput.cpp
httpinput.h
input.cpp

index d0e1c5c82135f42bf7ac1b75e77d90af5d4a5402..274cc2687b160d45e365fc7f4daafb665b9d2660 100644 (file)
 #include <sys/ioctl.h>
 #include <sys/socket.h>
 #include <sys/time.h>
 #include <sys/ioctl.h>
 #include <sys/socket.h>
 #include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
 #include <time.h>
 #include <unistd.h>
 #include <math.h>
 #include <time.h>
 #include <unistd.h>
 #include <math.h>
+#include <spawn.h>
 #include <map>
 #include <string>
 #include <utility>
 #include <map>
 #include <string>
 #include <utility>
@@ -220,6 +223,52 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port)
        freeaddrinfo(base_ai);
        return -1;
 }
        freeaddrinfo(base_ai);
        return -1;
 }
+
+int HTTPInput::open_child_process(const string &cmdline)
+{
+       int devnullfd = open("/dev/null", O_RDONLY | O_CLOEXEC);
+       if (devnullfd == -1) {
+               log_perror("/dev/null");
+               return -1;
+       }
+
+       int pipefd[2];
+       if (pipe2(pipefd, O_CLOEXEC) == -1) {
+               log_perror("pipe2()");
+               close(devnullfd);
+               return -1;
+       }
+
+       // Point stdout to us, stdin to /dev/null, and stderr remains where it is
+       // (probably the systemd log). All other file descriptors should be marked
+       // as close-on-exec, and should thus not leak into the child.
+       posix_spawn_file_actions_t actions;
+       posix_spawn_file_actions_init(&actions);
+       posix_spawn_file_actions_adddup2(&actions, devnullfd, 0);
+       posix_spawn_file_actions_adddup2(&actions, pipefd[1], 1);
+
+       pid_t child_pid;
+       char * const argv[] = {
+               strdup("/bin/sh"),
+               strdup("-c"),
+               strdup(path.c_str()),
+               nullptr
+       };
+       int err = posix_spawn(&child_pid, "/bin/sh", &actions, /*attrp=*/nullptr, argv, /*envp=*/nullptr);
+       posix_spawn_file_actions_destroy(&actions);
+       free(argv[0]);
+       free(argv[1]);
+       free(argv[2]);
+       close(pipefd[1]);
+
+       if (err == 0) {
+               return pipefd[0];
+       } else {
+               log_perror(cmdline.c_str());
+               close(pipefd[0]);
+               return -1;
+       }
+}
        
 bool HTTPInput::parse_response(const string &request)
 {
        
 bool HTTPInput::parse_response(const string &request)
 {
@@ -332,7 +381,11 @@ void HTTPInput::do_work()
                }
 
                switch (state) {
                }
 
                switch (state) {
-               case NOT_CONNECTED:
+               case NOT_CONNECTED: {
+                       // Reap any exited children.
+                       int wstatus;
+                       while (waitpid(-1, &wstatus, WNOHANG) != -1 || errno == EINTR) ;
+
                        request.clear();
                        request_bytes_sent = 0;
                        response.clear();
                        request.clear();
                        request_bytes_sent = 0;
                        response.clear();
@@ -343,8 +396,9 @@ void HTTPInput::do_work()
                                servers->set_unavailable(stream_index);
                        }
 
                                servers->set_unavailable(stream_index);
                        }
 
+                       string protocol;
                        {
                        {
-                               string protocol, user;  // Thrown away.
+                               string user;  // Thrown away.
                                if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                                        if (!suppress_logging) {
                                                log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str());
                                if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                                        if (!suppress_logging) {
                                                log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str());
@@ -378,18 +432,33 @@ void HTTPInput::do_work()
                                assert(err != -1);
                        }
                        ++num_connection_attempts;
                                assert(err != -1);
                        }
                        ++num_connection_attempts;
-                       sock = lookup_and_connect(host, port);
+                       if (protocol == "pipe") {
+                               sock = open_child_process(path.c_str());
+
+                               if (sock != -1) {
+                                       // Construct a minimal HTTP header.
+                                       http_header = "HTTP/1.0 200 OK\r\n";
+                                       for (int stream_index : stream_indices) {
+                                               servers->set_header(stream_index, http_header, stream_header);
+                                       }
+                                       state = RECEIVING_DATA;
+                               }
+                       } else {
+                               sock = lookup_and_connect(host, port);
+                               if (sock != -1) {
+                                       // Yay, successful connect.
+                                       state = SENDING_REQUEST;
+                                       request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n";
+                                       request_bytes_sent = 0;
+                               }
+                       }
                        if (sock != -1) {
                        if (sock != -1) {
-                               // Yay, successful connect.
-                               state = SENDING_REQUEST;
-                               request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n";
-                               request_bytes_sent = 0;
-
                                lock_guard<mutex> lock(stats_mutex);
                                stats.connect_time = time(nullptr);
                                clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity);
                        }
                        break;
                                lock_guard<mutex> lock(stats_mutex);
                                stats.connect_time = time(nullptr);
                                clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity);
                        }
                        break;
+               }
                case SENDING_REQUEST: {
                        size_t to_send = request.size() - request_bytes_sent;
                        int ret;
                case SENDING_REQUEST: {
                        size_t to_send = request.size() - request_bytes_sent;
                        int ret;
index 4ee5ede87542866182133f23b40b2bb12b8735c7..890cc9ef91256b92145e58dcc3425831fa0747fc 100644 (file)
@@ -12,6 +12,8 @@
 
 class InputProto;
 
 
 class InputProto;
 
+// Despite the name, covers input over both HTTP and pipes, both typically
+// wrapped in Metacube.
 class HTTPInput : public Input {
 public:
        HTTPInput(const std::string &url, Input::Encoding encoding);
 class HTTPInput : public Input {
 public:
        HTTPInput(const std::string &url, Input::Encoding encoding);
@@ -35,6 +37,10 @@ private:
        // Open a socket that connects to the given host and port. Does DNS resolving.
        int lookup_and_connect(const std::string &host, const std::string &port);
 
        // Open a socket that connects to the given host and port. Does DNS resolving.
        int lookup_and_connect(const std::string &host, const std::string &port);
 
+       // Open a child process with the given command line (given to /bin/sh).
+       // Returns a pipe to its standard output.
+       int open_child_process(const std::string &cmdline);
+
        // Parses a HTTP response. Returns false if it not a 200.
        bool parse_response(const std::string &response);
 
        // Parses a HTTP response. Returns false if it not a 200.
        bool parse_response(const std::string &response);
 
index dba6cf38b26b3ca87a6a953c8842600012e0124b..81bc2dc4ea4d396ce6d9d7855b211919aff878e4 100644 (file)
--- a/input.cpp
+++ b/input.cpp
@@ -30,6 +30,13 @@ void split_user_host(const string &user_host, string *user, string *host)
 // Extremely rudimentary URL parsing.
 bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path)
 {
 // Extremely rudimentary URL parsing.
 bool parse_url(const string &url, string *protocol, string *user, string *host, string *port, string *path)
 {
+       // pipe:foo (or pipe:"foo").
+       if (url.find("pipe:") == 0) {
+               *protocol = "pipe";
+               *path = string(url.begin() + 5, url.end());
+               return true;
+       }
+
        size_t split = url.find("://");
        if (split == string::npos) {
                return false;
        size_t split = url.find("://");
        if (split == string::npos) {
                return false;
@@ -100,7 +107,7 @@ Input *create_input(const string &url, Input::Encoding encoding)
        if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                return nullptr;
        }
        if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                return nullptr;
        }
-       if (protocol == "http") {
+       if (protocol == "http" || protocol == "pipe") {
                return new HTTPInput(url, encoding);
        }
        if (protocol == "udp") {
                return new HTTPInput(url, encoding);
        }
        if (protocol == "udp") {
@@ -116,7 +123,7 @@ Input *create_input(const InputProto &serialized)
        if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) {
                return nullptr;
        }
        if (!parse_url(serialized.url(), &protocol, &user, &host, &port, &path)) {
                return nullptr;
        }
-       if (protocol == "http") {
+       if (protocol == "http" || protocol == "pipe") {
                return new HTTPInput(serialized);
        }
        if (protocol == "udp") {
                return new HTTPInput(serialized);
        }
        if (protocol == "udp") {