X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=httpinput.cpp;h=274cc2687b160d45e365fc7f4daafb665b9d2660;hp=5b74319d71f77645c99b45452b0a677298316901;hb=e3f2936e3c9ff3b5569759c1aaed16f03bf728f8;hpb=a0fe013448d188b324c00383cfd91695d9d3d076 diff --git a/httpinput.cpp b/httpinput.cpp index 5b74319..274cc26 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include #include #include @@ -9,9 +11,12 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include @@ -20,7 +25,6 @@ #include "httpinput.h" #include "log.h" #include "metacube2.h" -#include "mutexlock.h" #include "parse.h" #include "serverpool.h" #include "state.pb.h" @@ -49,22 +53,14 @@ extern ServerPool *servers; HTTPInput::HTTPInput(const string &url, Input::Encoding encoding) : state(NOT_CONNECTED), url(url), - encoding(encoding), - has_metacube_header(false), - sock(-1), - num_connection_attempts(0), - suppress_logging(false) + encoding(encoding) { - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = 0; stats.data_bytes_received = 0; stats.metadata_bytes_received = 0; stats.connect_time = -1; stats.latency_sec = HUGE_VAL; - - last_verbose_connection.tv_sec = -3600; - last_verbose_connection.tv_nsec = 0; } HTTPInput::HTTPInput(const InputProto &serialized) @@ -79,17 +75,18 @@ HTTPInput::HTTPInput(const InputProto &serialized) http_header(serialized.http_header()), stream_header(serialized.stream_header()), has_metacube_header(serialized.has_metacube_header()), - sock(serialized.sock()), - num_connection_attempts(0), - suppress_logging(false) + sock(serialized.sock()) { + // Set back the close-on-exec flag for the socket. + // (This can't leak into a child, since we haven't been started yet.) + fcntl(sock, F_SETFD, 1); + pending_data.resize(serialized.pending_data().size()); memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); string protocol, user; parse_url(url, &protocol, &user, &host, &port, &path); // Don't care if it fails. - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = serialized.bytes_received(); stats.data_bytes_received = serialized.data_bytes_received(); @@ -116,12 +113,16 @@ void HTTPInput::close_socket() sock = -1; } - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.connect_time = -1; } InputProto HTTPInput::serialize() const { + // Unset the close-on-exec flag for the socket. + // (This can't leak into a child, since there's only one thread left.) + fcntl(sock, F_SETFD, 0); + InputProto serialized; serialized.set_state(state); serialized.set_url(url); @@ -164,23 +165,14 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) // Connect to everything in turn until we have a socket. for ( ; ai && !should_stop(); ai = ai->ai_next) { - int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); + // Now do a non-blocking connect. This is important because we want to be able to be + // woken up, even though it's rather cumbersome. + int sock = socket(ai->ai_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP); if (sock == -1) { // Could be e.g. EPROTONOSUPPORT. The show must go on. continue; } - // Now do a non-blocking connect. This is important because we want to be able to be - // woken up, even though it's rather cumbersome. - - // Set the socket as nonblocking. - int one = 1; - if (ioctl(sock, FIONBIO, &one) == -1) { - log_perror("ioctl(FIONBIO)"); - safe_close(sock); - return -1; - } - // Do a non-blocking connect. do { err = connect(sock, ai->ai_addr, ai->ai_addrlen); @@ -231,6 +223,52 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) freeaddrinfo(base_ai); return -1; } + +int HTTPInput::open_child_process(const string &cmdline) +{ + int devnullfd = open("/dev/null", O_RDONLY | O_CLOEXEC); + if (devnullfd == -1) { + log_perror("/dev/null"); + return -1; + } + + int pipefd[2]; + if (pipe2(pipefd, O_CLOEXEC) == -1) { + log_perror("pipe2()"); + close(devnullfd); + return -1; + } + + // Point stdout to us, stdin to /dev/null, and stderr remains where it is + // (probably the systemd log). All other file descriptors should be marked + // as close-on-exec, and should thus not leak into the child. + posix_spawn_file_actions_t actions; + posix_spawn_file_actions_init(&actions); + posix_spawn_file_actions_adddup2(&actions, devnullfd, 0); + posix_spawn_file_actions_adddup2(&actions, pipefd[1], 1); + + pid_t child_pid; + char * const argv[] = { + strdup("/bin/sh"), + strdup("-c"), + strdup(path.c_str()), + nullptr + }; + int err = posix_spawn(&child_pid, "/bin/sh", &actions, /*attrp=*/nullptr, argv, /*envp=*/nullptr); + posix_spawn_file_actions_destroy(&actions); + free(argv[0]); + free(argv[1]); + free(argv[2]); + close(pipefd[1]); + + if (err == 0) { + return pipefd[0]; + } else { + log_perror(cmdline.c_str()); + close(pipefd[0]); + return -1; + } +} bool HTTPInput::parse_response(const string &request) { @@ -260,17 +298,15 @@ bool HTTPInput::parse_response(const string &request) return false; } - multimap parameters = extract_headers(lines, url); + HTTPHeaderMultimap parameters = extract_headers(lines, url); // Remove “Content-encoding: metacube”. - // TODO: Make case-insensitive. - const auto encoding_it = parameters.find("Content-encoding"); + const auto encoding_it = parameters.find("Content-Encoding"); if (encoding_it != parameters.end() && encoding_it->second == "metacube") { parameters.erase(encoding_it); } // Change “Server: foo” to “Server: metacube/0.1 (reflecting: foo)” - // TODO: Make case-insensitive. // XXX: Use a Via: instead? if (parameters.count("Server") == 0) { parameters.insert(make_pair("Server", SERVER_IDENTIFICATION)); @@ -283,10 +319,8 @@ bool HTTPInput::parse_response(const string &request) } } - // Set “Connection: close”. - // TODO: Make case-insensitive. + // Erase “Connection: close”; we'll set it on the sending side if needed. parameters.erase("Connection"); - parameters.insert(make_pair("Connection", "close")); // Construct the new HTTP header. http_header = "HTTP/1.0 200 OK\r\n"; @@ -347,18 +381,24 @@ void HTTPInput::do_work() } switch (state) { - case NOT_CONNECTED: + case NOT_CONNECTED: { + // Reap any exited children. + int wstatus; + while (waitpid(-1, &wstatus, WNOHANG) != -1 || errno == EINTR) ; + request.clear(); request_bytes_sent = 0; response.clear(); pending_data.clear(); has_metacube_header = false; for (int stream_index : stream_indices) { - servers->set_header(stream_index, "", ""); + // Don't zero out the header; it might still be of use to HLS clients. + servers->set_unavailable(stream_index); } + string protocol; { - string protocol, user; // Thrown away. + string user; // Thrown away. if (!parse_url(url, &protocol, &user, &host, &port, &path)) { if (!suppress_logging) { log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str()); @@ -392,24 +432,33 @@ void HTTPInput::do_work() assert(err != -1); } ++num_connection_attempts; - sock = lookup_and_connect(host, port); - if (sock != -1) { - // Yay, successful connect. Try to set it as nonblocking. - int one = 1; - if (ioctl(sock, FIONBIO, &one) == -1) { - log_perror("ioctl(FIONBIO)"); - state = CLOSING_SOCKET; - } else { + if (protocol == "pipe") { + sock = open_child_process(path.c_str()); + + if (sock != -1) { + // Construct a minimal HTTP header. + http_header = "HTTP/1.0 200 OK\r\n"; + for (int stream_index : stream_indices) { + servers->set_header(stream_index, http_header, stream_header); + } + state = RECEIVING_DATA; + } + } else { + sock = lookup_and_connect(host, port); + if (sock != -1) { + // Yay, successful connect. state = SENDING_REQUEST; request = "GET " + path + " HTTP/1.0\r\nHost: " + host_header(host, port) + "\r\nUser-Agent: cubemap\r\n\r\n"; request_bytes_sent = 0; } - - MutexLock lock(&stats_mutex); + } + if (sock != -1) { + lock_guard lock(stats_mutex); stats.connect_time = time(nullptr); clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity); } break; + } case SENDING_REQUEST: { size_t to_send = request.size() - request_bytes_sent; int ret; @@ -575,13 +624,13 @@ void HTTPInput::do_work() void HTTPInput::process_data(char *ptr, size_t bytes) { { - MutexLock mutex(&stats_mutex); + lock_guard lock(stats_mutex); stats.bytes_received += bytes; } if (encoding == Input::INPUT_ENCODING_RAW) { for (int stream_index : stream_indices) { - servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0); + servers->add_data(stream_index, ptr, bytes, /*metacube_flags=*/0, /*pts=*/RationalPTS()); } return; } @@ -654,14 +703,14 @@ void HTTPInput::process_data(char *ptr, size_t bytes) // TODO: Keep metadata when sending on to other Metacube users. if (flags & METACUBE_FLAGS_METADATA) { { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.metadata_bytes_received += size; } process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size); } else { // Send this block on to the servers. { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.data_bytes_received += size; } char *inner_data = pending_data.data() + sizeof(metacube2_block_header); @@ -672,8 +721,9 @@ void HTTPInput::process_data(char *ptr, size_t bytes) } } for (int stream_index : stream_indices) { - servers->add_data(stream_index, inner_data, size, flags); + servers->add_data(stream_index, inner_data, size, flags, next_block_pts); } + next_block_pts = RationalPTS(); } // Consume the block. This isn't the most efficient way of dealing with things @@ -703,7 +753,7 @@ void HTTPInput::add_destination(int stream_index) InputStats HTTPInput::get_stats() const { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); return stats; } @@ -716,25 +766,36 @@ void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hd } uint64_t type = be64toh(*(const uint64_t *)payload); - if (type != METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) { - // Unknown metadata block, ignore. - return; - } - - timespec now; - clock_gettime(CLOCK_REALTIME, &now); + if (type == METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP) { + timespec now; + clock_gettime(CLOCK_REALTIME, &now); + + const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload; + if (payload_size != sizeof(*pkt)) { + log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.", + url.c_str(), payload_size); + return; + } - const metacube2_timestamp_packet *pkt = (const metacube2_timestamp_packet *)payload; - if (payload_size != sizeof(*pkt)) { - log(WARNING, "[%s] Metacube timestamp block of wrong size (%d bytes); ignoring.", - url.c_str(), payload_size); + double elapsed = now.tv_sec - be64toh(pkt->tv_sec) + + 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec))); + { + lock_guard lock(stats_mutex); + stats.latency_sec = elapsed; + } + } else if (type == METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS) { + const metacube2_pts_packet *pkt = (const metacube2_pts_packet *)payload; + if (payload_size != sizeof(*pkt)) { + log(WARNING, "[%s] Metacube pts block of wrong size (%d bytes); ignoring.", + url.c_str(), payload_size); + return; + } + next_block_pts.pts = be64toh(pkt->pts); + next_block_pts.timebase_num = be64toh(pkt->timebase_num); + next_block_pts.timebase_den = be64toh(pkt->timebase_den); + } else { + // Unknown metadata block, ignore + log(INFO, "[%s] Metadata block %llu\n", url.c_str(), type); return; } - - double elapsed = now.tv_sec - be64toh(pkt->tv_sec) + - 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec))); - { - MutexLock lock(&stats_mutex); - stats.latency_sec = elapsed; - } }