From 0cb56be70f7ca4f4564eea892a99d20032359a1d Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 20 Apr 2013 02:19:22 +0200 Subject: [PATCH] Reinstate the new signal handling; revert the revert. The fix for high CPU usage was not doing wakeup() all the time for new data; not only can seemingly sending the signal be expensive, but we sent it while still holding the mutex the awakening thread needed, so it was extra-bad. We're seemingly down to a reasonable amount of CPU usage again now, but still with good non-racing semantics. --- acceptor.cpp | 27 ++----------- accesslog.cpp | 24 ++--------- httpinput.cpp | 99 +++++++++++++++++++++++++-------------------- main.cpp | 5 +++ server.cpp | 43 +++++++------------- serverpool.cpp | 10 +---- stats.cpp | 30 ++++---------- stream.cpp | 8 +--- thread.cpp | 108 ++++++++++++++++++++++++++++++------------------- thread.h | 47 ++++++++++++++++----- udpinput.cpp | 30 ++++---------- util.cpp | 30 +++++++------- util.h | 4 ++ 13 files changed, 223 insertions(+), 242 deletions(-) diff --git a/acceptor.cpp b/acceptor.cpp index 3172c73..2abfa52 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -12,11 +12,11 @@ #include "log.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" using namespace std; extern ServerPool *servers; -extern volatile bool hupped; int create_server_socket(int port, SocketType socket_type) { @@ -93,32 +93,13 @@ AcceptorProto Acceptor::serialize() const void Acceptor::close_socket() { - int ret; - do { - ret = close(server_sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - } + safe_close(server_sock); } void Acceptor::do_work() { - while (!hupped) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check hupped. - pollfd pfd; - pfd.fd = server_sock; - pfd.events = POLLIN; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { - continue; - } - if (nfds == -1) { - log_perror("poll"); - usleep(100000); + while (!should_stop()) { + if (!wait_for_activity(server_sock, POLLIN, NULL)) { continue; } diff --git a/accesslog.cpp b/accesslog.cpp index 03aeec8..81eff90 100644 --- a/accesslog.cpp +++ b/accesslog.cpp @@ -43,7 +43,7 @@ void AccessLogThread::do_work() } } - while (!should_stop) { + while (!should_stop()) { // Empty the queue. vector writes; { @@ -66,26 +66,10 @@ void AccessLogThread::do_work() } fflush(logfp); } - - // Wait until the stop_fd pipe is closed, one second has passed. - // or a spurious signal arrives. - pollfd pfd; - pfd.fd = stop_fd_read; - pfd.events = POLLIN | POLLRDHUP; - int nfds = poll(&pfd, 1, 1000); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { - continue; - } - if (nfds == 1) { - // Should stop. - break; - } - if (nfds == -1) { - log_perror("poll"); - usleep(100000); - continue; - } + // Wait until we are being woken up, either to quit or because + // there is material in pending_writes. + wait_for_wakeup(NULL); } if (logfp != NULL) { diff --git a/httpinput.cpp b/httpinput.cpp index 4950183..f92ac13 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -20,6 +21,7 @@ #include "parse.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" #include "version.h" using namespace std; @@ -60,14 +62,7 @@ HTTPInput::HTTPInput(const InputProto &serialized) void HTTPInput::close_socket() { - int ret; - do { - ret = close(sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close()"); - } + safe_close(sock); } InputProto HTTPInput::serialize() const @@ -99,31 +94,64 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) addrinfo *base_ai = ai; // Connect to everything in turn until we have a socket. - while (ai && !should_stop) { + while (ai && !should_stop()) { int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); if (sock == -1) { // Could be e.g. EPROTONOSUPPORT. The show must go on. continue; } + // Now do a non-blocking connect. This is important because we want to be able to be + // woken up, even though it's rather cumbersome. + + // Set the socket as nonblocking. + int one = 1; + if (ioctl(sock, FIONBIO, &one) == -1) { + log_perror("ioctl(FIONBIO)"); + safe_close(sock); + return -1; + } + + // Do a non-blocking connect. do { err = connect(sock, ai->ai_addr, ai->ai_addrlen); } while (err == -1 && errno == EINTR); - if (err != -1) { - freeaddrinfo(base_ai); - return sock; + if (err == -1 && errno != EINPROGRESS) { + log_perror("connect"); + safe_close(sock); + continue; } - do { - err = close(sock); - } while (err == -1 && errno == EINTR); + // Wait for the connect to complete, or an error to happen. + for ( ;; ) { + bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL); + if (should_stop()) { + safe_close(sock); + return -1; + } + if (complete) { + break; + } + } - if (err == -1) { - log_perror("close"); - // Can still continue. + // Check whether it ended in an error or not. + socklen_t err_size = sizeof(err); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) { + log_perror("getsockopt"); + safe_close(sock); + continue; + } + + errno = err; + + if (err == 0) { + // Successful connect. + freeaddrinfo(base_ai); + return sock; } + safe_close(sock); ai = ai->ai_next; } @@ -221,23 +249,13 @@ bool HTTPInput::parse_response(const std::string &request) void HTTPInput::do_work() { - while (!should_stop) { + while (!should_stop()) { if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; - pfd.events |= POLLRDHUP; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL); + if (!activity) { + // Most likely, should_stop was set. continue; } - if (nfds == -1) { - log_perror("poll"); - state = CLOSING_SOCKET; - } } switch (state) { @@ -377,15 +395,7 @@ void HTTPInput::do_work() break; } case CLOSING_SOCKET: { - int err; - do { - err = close(sock); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - log_perror("close"); - } - + close_socket(); state = NOT_CONNECTED; break; } @@ -396,9 +406,12 @@ void HTTPInput::do_work() // If we are still in NOT_CONNECTED, either something went wrong, // or the connection just got closed. // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop) { + if (state == NOT_CONNECTED && !should_stop()) { log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str()); - usleep(200000); + timespec timeout_ts; + timeout_ts.tv_sec = 0; + timeout_ts.tv_nsec = 200000000; + wait_for_wakeup(&timeout_ts); } } } diff --git a/main.cpp b/main.cpp index 46eddaf..c89ec72 100644 --- a/main.cpp +++ b/main.cpp @@ -50,6 +50,10 @@ void hup(int signum) } } +void do_nothing(int signum) +{ +} + CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, const multimap inputs, @@ -239,6 +243,7 @@ int main(int argc, char **argv) { signal(SIGHUP, hup); signal(SIGINT, hup); + signal(SIGUSR1, do_nothing); // Used in internal signalling. signal(SIGPIPE, SIG_IGN); // Parse options. diff --git a/server.cpp b/server.cpp index b5f58d4..6d5cce3 100644 --- a/server.cpp +++ b/server.cpp @@ -26,6 +26,7 @@ #include "server.h" #include "state.pb.h" #include "stream.h" +#include "util.h" using namespace std; @@ -51,14 +52,7 @@ Server::~Server() delete stream_it->second; } - int ret; - do { - ret = close(epoll_fd); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close(epoll_fd)"); - } + safe_close(epoll_fd); } vector Server::get_client_stats() const @@ -76,15 +70,17 @@ vector Server::get_client_stats() const void Server::do_work() { - for ( ;; ) { - int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS); - if (nfds == -1 && errno == EINTR) { - if (should_stop) { - return; - } - continue; - } - if (nfds == -1) { + while (!should_stop()) { + // Wait until there's activity on at least one of the fds, + // or 20 ms (about one frame at 50 fps) has elapsed. + // + // We could in theory wait forever and rely on wakeup() + // from add_client_deferred() and add_data_deferred(), + // but wakeup is a pretty expensive operation, and the + // two threads might end up fighting over a lock, so it's + // seemingly (much) more efficient to just have a timeout here. + int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS, &sigset_without_usr1_block); + if (nfds == -1 && errno != EINTR) { log_perror("epoll_wait"); exit(1); } @@ -113,10 +109,6 @@ void Server::do_work() process_client(to_process[i]); } } - - if (should_stop) { - return; - } } } @@ -558,14 +550,7 @@ void Server::close_client(Client *client) access_log->write(client->get_stats()); // Bye-bye! - int ret; - do { - ret = close(client->sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - } + safe_close(client->sock); clients.erase(client->sock); } diff --git a/serverpool.cpp b/serverpool.cpp index fb16682..e309ab9 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -92,15 +92,7 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vec // Close and delete any leftovers, if the number of servers was reduced. for (size_t i = num_servers; i < data_fds.size(); ++i) { - int ret; - do { - ret = close(data_fds[i]); // Implicitly deletes the file. - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - // Can still continue. - } + safe_close(data_fds[i]); // Implicitly deletes the file. } } diff --git a/stats.cpp b/stats.cpp index b2bca75..280f019 100644 --- a/stats.cpp +++ b/stats.cpp @@ -13,6 +13,7 @@ #include "log.h" #include "serverpool.h" #include "stats.h" +#include "util.h" using namespace std; @@ -26,7 +27,7 @@ StatsThread::StatsThread(const std::string &stats_file, int stats_interval) void StatsThread::do_work() { - while (!should_stop) { + while (!should_stop()) { int fd; FILE *fp; time_t now; @@ -44,9 +45,7 @@ void StatsThread::do_work() fp = fdopen(fd, "w"); if (fp == NULL) { log_perror("fdopen"); - if (close(fd) == -1) { - log_perror("close"); - } + safe_close(fd); if (unlink(filename) == -1) { log_perror(filename); } @@ -85,25 +84,12 @@ void StatsThread::do_work() free(filename); sleep: - // Wait until the stop_fd pipe is closed, stats_interval timeout, + // Wait until we are asked to quit, stats_interval timeout, // or a spurious signal. (The latter will cause us to write stats // too often, but that's okay.) - pollfd pfd; - pfd.fd = stop_fd_read; - pfd.events = POLLIN | POLLRDHUP; - - int nfds = poll(&pfd, 1, stats_interval * 1000); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { - continue; - } - if (nfds == 1) { - // Should stop. - break; - } - if (nfds == -1) { - log_perror("poll"); - usleep(100000); - continue; - } + timespec timeout_ts; + timeout_ts.tv_sec = stats_interval; + timeout_ts.tv_nsec = 0; + wait_for_wakeup(&timeout_ts); } } diff --git a/stream.cpp b/stream.cpp index 83257e9..ab08a0f 100644 --- a/stream.cpp +++ b/stream.cpp @@ -31,13 +31,7 @@ Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding) Stream::~Stream() { if (data_fd != -1) { - int ret; - do { - ret = close(data_fd); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - log_perror("close"); - } + safe_close(data_fd); } } diff --git a/thread.cpp b/thread.cpp index f719eac..bd4cfd1 100644 --- a/thread.cpp +++ b/thread.cpp @@ -1,71 +1,97 @@ +#include +#include +#include +#include +#include #include #include #include -#include -#include -#include #include "log.h" +#include "mutexlock.h" #include "thread.h" - + Thread::~Thread() {} void Thread::run() { - should_stop = false; - int pipefd[2]; - if (pipe2(pipefd, O_CLOEXEC) == -1) { - log_perror("pipe"); - exit(1); - } - stop_fd_read = pipefd[0]; - stop_fd_write = pipefd[1]; + pthread_mutex_init(&should_stop_mutex, NULL); + should_stop_status = false; pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this); } void Thread::stop() { - should_stop = true; - char ch = 0; - int err; - do { - err = write(stop_fd_write, &ch, 1); - } while (err == -1 && errno == EINTR); - - if (err == -1) { - log_perror("write"); + { + MutexLock lock(&should_stop_mutex); + should_stop_status = true; + } + wakeup(); + if (pthread_join(worker_thread, NULL) == -1) { + log_perror("pthread_join"); exit(1); } +} - do { - err = close(stop_fd_write); - } while (err == -1 && errno == EINTR); +void *Thread::do_work_thunk(void *arg) +{ + Thread *thread = reinterpret_cast(arg); - if (err == -1) { - log_perror("close"); - // Can continue (we have close-on-exec). + // Block SIGHUP; only the main thread should see that. + // (This isn't strictly required, but it makes it easier to debug that indeed + // SIGUSR1 was what woke us up.) + sigset_t set; + sigaddset(&set, SIGHUP); + int err = pthread_sigmask(SIG_BLOCK, &set, NULL); + if (err != 0) { + errno = err; + log_perror("pthread_sigmask"); + exit(1); } - pthread_kill(worker_thread, SIGHUP); - if (pthread_join(worker_thread, NULL) == -1) { - log_perror("pthread_join"); + // Block SIGUSR1, and store the old signal mask. + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block); + if (err != 0) { + errno = err; + log_perror("pthread_sigmask"); exit(1); } - - do { - err = close(stop_fd_read); - } while (err == -1 && errno == EINTR); - if (err == -1) { - log_perror("close"); - // Can continue (we have close-on-exec). + // Call the right thunk. + thread->do_work(); + return NULL; +} + +bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts) +{ + pollfd pfd; + pfd.fd = fd; + pfd.events = events; + + for ( ;; ) { + int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block); + if (nfds == -1 && errno == EINTR) { + return false; + } + if (nfds == -1) { + log_perror("poll"); + usleep(100000); + continue; + } + assert(nfds <= 1); + return (nfds == 1); } } -void *Thread::do_work_thunk(void *arg) +void Thread::wakeup() { - Thread *thread = reinterpret_cast(arg); - thread->do_work(); - return NULL; + pthread_kill(worker_thread, SIGUSR1); } +bool Thread::should_stop() +{ + MutexLock lock(&should_stop_mutex); + return should_stop_status; +} diff --git a/thread.h b/thread.h index 26e648e..728df27 100644 --- a/thread.h +++ b/thread.h @@ -1,12 +1,19 @@ #ifndef _THREAD_H #define _THREAD_H +#include #include -// A rather generic thread class with start/stop functionality. -// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls), -// since signals don't stick. We'll need to figure out something more -// intelligent later, probably based on sending a signal to an fd. +struct timespec; + +// A thread class with start/stop and signal functionality. +// +// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed +// to receive it when doing wait_for_activity(), and never elsewhere. This means +// that you can test whatever status flags you'd want before calling +// wait_for_activity(), and then be sure that it actually returns immediately +// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test +// and the wait_for_activity() call. class Thread { public: @@ -15,21 +22,39 @@ public: void stop(); protected: - // Recovers the this pointer, and calls do_work(). + // Recovers the this pointer, blocks SIGUSR1, and calls do_work(). static void *do_work_thunk(void *arg); virtual void do_work() = 0; - volatile bool should_stop; + // Waits until there is activity of the given type on (or an error), + // or until a wakeup. Returns true if there was actually activity on + // the file descriptor. + // + // If fd is -1, wait until a wakeup or timeout. + // if timeout_ts is NULL, there is no timeout. + bool wait_for_activity(int fd, short events, const timespec *timeout_ts); + + // Wait until a wakeup. + void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); } + + // Make wait_for_activity() return. Note that this is a relatively expensive + // operation. + void wakeup(); - // A pipe that you can poll on if you want to see when should_stop - // has been set to true; stop() will write a single byte to the pipe - // and then close the other end. - int stop_fd_read; + bool should_stop(); + + // The signal set as it were before we blocked SIGUSR1. + sigset_t sigset_without_usr1_block; private: pthread_t worker_thread; - int stop_fd_write; + + // Protects should_stop_status. + pthread_mutex_t should_stop_mutex; + + // If this is set, the thread should return as soon as possible from do_work(). + bool should_stop_status; }; #endif // !defined(_THREAD_H) diff --git a/udpinput.cpp b/udpinput.cpp index 4222515..07bf63d 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -12,6 +12,7 @@ #include "serverpool.h" #include "state.pb.h" #include "udpinput.h" +#include "util.h" #include "version.h" using namespace std; @@ -52,15 +53,7 @@ InputProto UDPInput::serialize() const void UDPInput::close_socket() { - int ret; - do { - ret = close(sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close()"); - } - + safe_close(sock); sock = -1; } @@ -82,7 +75,7 @@ void UDPInput::add_destination(const string &stream_id) void UDPInput::do_work() { - while (!should_stop) { + while (!should_stop()) { if (sock == -1) { int port_num = atoi(port.c_str()); sock = create_server_socket(port_num, UDP_SOCKET); @@ -94,21 +87,12 @@ void UDPInput::do_work() } } - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check should_stop. - pollfd pfd; - pfd.fd = sock; - pfd.events = POLLIN; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + // Wait for a packet, or a wakeup. + bool activity = wait_for_activity(sock, POLLIN, NULL); + if (!activity) { + // Most likely, should_stop was set. continue; } - if (nfds == -1) { - log_perror("poll"); - close_socket(); - continue; - } char buf[4096]; int ret; diff --git a/util.cpp b/util.cpp index d6d39f2..2bbffdc 100644 --- a/util.cpp +++ b/util.cpp @@ -33,9 +33,7 @@ int make_tempfile(const std::string &contents) ssize_t ret = write(fd, ptr, to_write); if (ret == -1) { log_perror("write"); - if (close(fd) == -1) { - log_perror("close"); - } + safe_close(fd); return -1; } @@ -49,17 +47,7 @@ int make_tempfile(const std::string &contents) bool read_tempfile_and_close(int fd, std::string *contents) { bool ok = read_tempfile(fd, contents); - - int ret; - do { - ret = close(fd); // Implicitly deletes the file. - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - // Can still continue. - } - + safe_close(fd); // Implicitly deletes the file. return ok; } @@ -96,3 +84,17 @@ bool read_tempfile(int fd, std::string *contents) return true; } + +int safe_close(int fd) +{ + int ret; + do { + ret = close(fd); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close()"); + } + + return ret; +} diff --git a/util.h b/util.h index 303de14..d167075 100644 --- a/util.h +++ b/util.h @@ -15,4 +15,8 @@ bool read_tempfile_and_close(int fd, std::string *contents); // Same as read_tempfile_and_close(), without the close. bool read_tempfile(int fd, std::string *contents); +// Close a file descriptor, taking care of EINTR on the way. +// log_perror() if it fails; apart from that, behaves as close(). +int safe_close(int fd); + #endif // !defined(_UTIL_H -- 2.39.2