X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=httpinput.cpp;h=62ed663611402cef5d4517cef00d8033300391bc;hb=0229f4414b3b63934c057030deb88cbf6926bb1f;hp=d0e1c5c82135f42bf7ac1b75e77d90af5d4a5402;hpb=6d34c5b6d8e5bec5d1421eadc103f38d206f34f1;p=cubemap diff --git a/httpinput.cpp b/httpinput.cpp index d0e1c5c..62ed663 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -11,9 +11,12 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include @@ -72,11 +75,12 @@ HTTPInput::HTTPInput(const InputProto &serialized) http_header(serialized.http_header()), stream_header(serialized.stream_header()), has_metacube_header(serialized.has_metacube_header()), - sock(serialized.sock()) + sock(serialized.sock()), + child_pid(serialized.child_pid()) { // Set back the close-on-exec flag for the socket. // (This can't leak into a child, since we haven't been started yet.) - fcntl(sock, F_SETFD, 1); + fcntl(sock, F_SETFD, O_CLOEXEC); pending_data.resize(serialized.pending_data().size()); memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); @@ -109,6 +113,13 @@ void HTTPInput::close_socket() safe_close(sock); sock = -1; } + if (child_pid != -1) { + // Kill the child process group, forcibly. + // TODO: Consider using a pidfd on newer kernels, so that we're guaranteed + // never to kill the wrong process. + kill(-child_pid, SIGKILL); + } + child_pid = -1; lock_guard lock(stats_mutex); stats.connect_time = -1; @@ -131,6 +142,7 @@ InputProto HTTPInput::serialize() const serialized.set_pending_data(string(pending_data.begin(), pending_data.end())); serialized.set_has_metacube_header(has_metacube_header); serialized.set_sock(sock); + serialized.set_child_pid(child_pid); serialized.set_bytes_received(stats.bytes_received); serialized.set_data_bytes_received(stats.data_bytes_received); if (isfinite(stats.latency_sec)) { @@ -220,6 +232,62 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) freeaddrinfo(base_ai); return -1; } + +int HTTPInput::open_child_process(const string &cmdline) +{ + int devnullfd = open("/dev/null", O_RDONLY | O_CLOEXEC); + if (devnullfd == -1) { + log_perror("/dev/null"); + return -1; + } + + int pipefd[2]; + if (pipe2(pipefd, O_CLOEXEC) == -1) { + log_perror("pipe2()"); + close(devnullfd); + return -1; + } + + // Point stdout to us, stdin to /dev/null, and stderr remains where it is + // (probably the systemd log). All other file descriptors should be marked + // as close-on-exec, and should thus not leak into the child. + posix_spawn_file_actions_t actions; + posix_spawn_file_actions_init(&actions); + posix_spawn_file_actions_adddup2(&actions, devnullfd, 0); + posix_spawn_file_actions_adddup2(&actions, pipefd[1], 1); + + // Make the process a leader of its own process group, so that we can easily + // kill it and any of its child processes (unless it's started new process + // groups itself, of course). + posix_spawnattr_t attr; + posix_spawnattr_init(&attr); + posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETPGROUP); + posix_spawnattr_setpgroup(&attr, 0); + + char * const argv[] = { + strdup("/bin/sh"), + strdup("-c"), + strdup(path.c_str()), + nullptr + }; + int err = posix_spawn(&child_pid, "/bin/sh", &actions, &attr, argv, /*envp=*/nullptr); + posix_spawn_file_actions_destroy(&actions); + posix_spawnattr_destroy(&attr); + free(argv[0]); + free(argv[1]); + free(argv[2]); + close(devnullfd); + close(pipefd[1]); + + if (err == 0) { + return pipefd[0]; + } else { + child_pid = -1; + log_perror(cmdline.c_str()); + close(pipefd[0]); + return -1; + } +} bool HTTPInput::parse_response(const string &request) { @@ -332,7 +400,24 @@ void HTTPInput::do_work() } switch (state) { - case NOT_CONNECTED: + case NOT_CONNECTED: { + // Reap any exited children. + int wstatus, err; + do { + err = waitpid(-1, &wstatus, WNOHANG); + if (err == -1) { + if (errno == EINTR) { + continue; + } + if (errno == ECHILD) { + break; + } + log_perror("waitpid"); + break; + } + } while (err != 0); + child_pid = -1; + request.clear(); request_bytes_sent = 0; response.clear(); @@ -343,8 +428,9 @@ void HTTPInput::do_work() servers->set_unavailable(stream_index); } + string protocol; { - string protocol, user; // Thrown away. + string user; // Thrown away. if (!parse_url(url, &protocol, &user, &host, &port, &path)) { if (!suppress_logging) { log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str()); @@ -378,18 +464,33 @@ void HTTPInput::do_work() assert(err != -1); } ++num_connection_attempts; - sock = lookup_and_connect(host, port); + if (protocol == "pipe") { + sock = open_child_process(path.c_str()); + + if (sock != -1) { + // Construct a minimal HTTP header. + http_header = "HTTP/1.0 200 OK\r\n"; + for (int stream_index : stream_indices) { + servers->set_header(stream_index, http_header, stream_header); + } + state = RECEIVING_DATA; + } + } else { + sock = lookup_and_connect(host, port); + if (sock != -1) { + // Yay, successful connect. + state = SENDING_REQUEST; + request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n"; + request_bytes_sent = 0; + } + } if (sock != -1) { - // Yay, successful connect. - state = SENDING_REQUEST; - request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n"; - request_bytes_sent = 0; - lock_guard lock(stats_mutex); stats.connect_time = time(nullptr); clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity); } break; + } case SENDING_REQUEST: { size_t to_send = request.size() - request_bytes_sent; int ret;