From: Steinar H. Gunderson Date: Fri, 19 Apr 2013 23:21:15 +0000 (+0200) Subject: Revert "Rewrite the entire internal signal handling/wakeup." X-Git-Tag: 1.0.0~60 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=71fc5575037bead8b6e927a1fffd199e4fc4514b Revert "Rewrite the entire internal signal handling/wakeup." Seemingly this had very bad effects on CPU usage. Will need to investigate later. This reverts commit 3fd8650ccf3da3960a946d8ac9abc305aec399ce. --- diff --git a/acceptor.cpp b/acceptor.cpp index 2abfa52..3172c73 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -12,11 +12,11 @@ #include "log.h" #include "serverpool.h" #include "state.pb.h" -#include "util.h" using namespace std; extern ServerPool *servers; +extern volatile bool hupped; int create_server_socket(int port, SocketType socket_type) { @@ -93,13 +93,32 @@ AcceptorProto Acceptor::serialize() const void Acceptor::close_socket() { - safe_close(server_sock); + int ret; + do { + ret = close(server_sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + } } void Acceptor::do_work() { - while (!should_stop()) { - if (!wait_for_activity(server_sock, POLLIN, NULL)) { + while (!hupped) { + // Since we are non-blocking, we need to wait for the right state first. + // Wait up to 50 ms, then check hupped. + pollfd pfd; + pfd.fd = server_sock; + pfd.events = POLLIN; + + int nfds = poll(&pfd, 1, 50); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + continue; + } + if (nfds == -1) { + log_perror("poll"); + usleep(100000); continue; } diff --git a/accesslog.cpp b/accesslog.cpp index 81eff90..03aeec8 100644 --- a/accesslog.cpp +++ b/accesslog.cpp @@ -43,7 +43,7 @@ void AccessLogThread::do_work() } } - while (!should_stop()) { + while (!should_stop) { // Empty the queue. vector writes; { @@ -66,10 +66,26 @@ void AccessLogThread::do_work() } fflush(logfp); } + + // Wait until the stop_fd pipe is closed, one second has passed. + // or a spurious signal arrives. + pollfd pfd; + pfd.fd = stop_fd_read; + pfd.events = POLLIN | POLLRDHUP; - // Wait until we are being woken up, either to quit or because - // there is material in pending_writes. - wait_for_wakeup(NULL); + int nfds = poll(&pfd, 1, 1000); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + continue; + } + if (nfds == 1) { + // Should stop. + break; + } + if (nfds == -1) { + log_perror("poll"); + usleep(100000); + continue; + } } if (logfp != NULL) { diff --git a/httpinput.cpp b/httpinput.cpp index f92ac13..4950183 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -21,7 +20,6 @@ #include "parse.h" #include "serverpool.h" #include "state.pb.h" -#include "util.h" #include "version.h" using namespace std; @@ -62,7 +60,14 @@ HTTPInput::HTTPInput(const InputProto &serialized) void HTTPInput::close_socket() { - safe_close(sock); + int ret; + do { + ret = close(sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close()"); + } } InputProto HTTPInput::serialize() const @@ -94,64 +99,31 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) addrinfo *base_ai = ai; // Connect to everything in turn until we have a socket. - while (ai && !should_stop()) { + while (ai && !should_stop) { int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP); if (sock == -1) { // Could be e.g. EPROTONOSUPPORT. The show must go on. continue; } - // Now do a non-blocking connect. This is important because we want to be able to be - // woken up, even though it's rather cumbersome. - - // Set the socket as nonblocking. - int one = 1; - if (ioctl(sock, FIONBIO, &one) == -1) { - log_perror("ioctl(FIONBIO)"); - safe_close(sock); - return -1; - } - - // Do a non-blocking connect. do { err = connect(sock, ai->ai_addr, ai->ai_addrlen); } while (err == -1 && errno == EINTR); - if (err == -1 && errno != EINPROGRESS) { - log_perror("connect"); - safe_close(sock); - continue; - } - - // Wait for the connect to complete, or an error to happen. - for ( ;; ) { - bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL); - if (should_stop()) { - safe_close(sock); - return -1; - } - if (complete) { - break; - } - } - - // Check whether it ended in an error or not. - socklen_t err_size = sizeof(err); - if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) { - log_perror("getsockopt"); - safe_close(sock); - continue; + if (err != -1) { + freeaddrinfo(base_ai); + return sock; } - errno = err; + do { + err = close(sock); + } while (err == -1 && errno == EINTR); - if (err == 0) { - // Successful connect. - freeaddrinfo(base_ai); - return sock; + if (err == -1) { + log_perror("close"); + // Can still continue. } - safe_close(sock); ai = ai->ai_next; } @@ -249,13 +221,23 @@ bool HTTPInput::parse_response(const std::string &request) void HTTPInput::do_work() { - while (!should_stop()) { + while (!should_stop) { if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL); - if (!activity) { - // Most likely, should_stop was set. + // Since we are non-blocking, we need to wait for the right state first. + // Wait up to 50 ms, then check should_stop. + pollfd pfd; + pfd.fd = sock; + pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN; + pfd.events |= POLLRDHUP; + + int nfds = poll(&pfd, 1, 50); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { continue; } + if (nfds == -1) { + log_perror("poll"); + state = CLOSING_SOCKET; + } } switch (state) { @@ -395,7 +377,15 @@ void HTTPInput::do_work() break; } case CLOSING_SOCKET: { - close_socket(); + int err; + do { + err = close(sock); + } while (err == -1 && errno == EINTR); + + if (err == -1) { + log_perror("close"); + } + state = NOT_CONNECTED; break; } @@ -406,12 +396,9 @@ void HTTPInput::do_work() // If we are still in NOT_CONNECTED, either something went wrong, // or the connection just got closed. // The earlier steps have already given the error message, if any. - if (state == NOT_CONNECTED && !should_stop()) { + if (state == NOT_CONNECTED && !should_stop) { log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str()); - timespec timeout_ts; - timeout_ts.tv_sec = 0; - timeout_ts.tv_nsec = 200000000; - wait_for_wakeup(&timeout_ts); + usleep(200000); } } } diff --git a/main.cpp b/main.cpp index c89ec72..46eddaf 100644 --- a/main.cpp +++ b/main.cpp @@ -50,10 +50,6 @@ void hup(int signum) } } -void do_nothing(int signum) -{ -} - CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, const multimap inputs, @@ -243,7 +239,6 @@ int main(int argc, char **argv) { signal(SIGHUP, hup); signal(SIGINT, hup); - signal(SIGUSR1, do_nothing); // Used in internal signalling. signal(SIGPIPE, SIG_IGN); // Parse options. diff --git a/server.cpp b/server.cpp index a0a32f8..b5f58d4 100644 --- a/server.cpp +++ b/server.cpp @@ -26,7 +26,6 @@ #include "server.h" #include "state.pb.h" #include "stream.h" -#include "util.h" using namespace std; @@ -52,7 +51,14 @@ Server::~Server() delete stream_it->second; } - safe_close(epoll_fd); + int ret; + do { + ret = close(epoll_fd); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close(epoll_fd)"); + } } vector Server::get_client_stats() const @@ -70,11 +76,15 @@ vector Server::get_client_stats() const void Server::do_work() { - while (!should_stop()) { - // Wait until there's activity on at least one of the fds, - // or we are waken up due to new queued clients or data. - int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block); - if (nfds == -1 && errno != EINTR) { + for ( ;; ) { + int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS); + if (nfds == -1 && errno == EINTR) { + if (should_stop) { + return; + } + continue; + } + if (nfds == -1) { log_perror("epoll_wait"); exit(1); } @@ -103,6 +113,10 @@ void Server::do_work() process_client(to_process[i]); } } + + if (should_stop) { + return; + } } } @@ -129,7 +143,6 @@ void Server::add_client_deferred(int sock) { MutexLock lock(&queued_data_mutex); queued_add_clients.push_back(sock); - wakeup(); } void Server::add_client(int sock) @@ -246,7 +259,6 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t { MutexLock lock(&queued_data_mutex); queued_data[stream_id].append(string(data, data + bytes)); - wakeup(); } // See the .h file for postconditions after this function. @@ -546,7 +558,14 @@ void Server::close_client(Client *client) access_log->write(client->get_stats()); // Bye-bye! - safe_close(client->sock); + int ret; + do { + ret = close(client->sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + } clients.erase(client->sock); } diff --git a/server.h b/server.h index 4f96487..a3e032d 100644 --- a/server.h +++ b/server.h @@ -19,6 +19,7 @@ class ClientProto; struct Stream; #define EPOLL_MAX_EVENTS 8192 +#define EPOLL_TIMEOUT_MS 20 #define MAX_CLIENT_REQUEST 16384 class CubemapStateProto; diff --git a/serverpool.cpp b/serverpool.cpp index e309ab9..fb16682 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -92,7 +92,15 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vec // Close and delete any leftovers, if the number of servers was reduced. for (size_t i = num_servers; i < data_fds.size(); ++i) { - safe_close(data_fds[i]); // Implicitly deletes the file. + int ret; + do { + ret = close(data_fds[i]); // Implicitly deletes the file. + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + // Can still continue. + } } } diff --git a/stats.cpp b/stats.cpp index 280f019..b2bca75 100644 --- a/stats.cpp +++ b/stats.cpp @@ -13,7 +13,6 @@ #include "log.h" #include "serverpool.h" #include "stats.h" -#include "util.h" using namespace std; @@ -27,7 +26,7 @@ StatsThread::StatsThread(const std::string &stats_file, int stats_interval) void StatsThread::do_work() { - while (!should_stop()) { + while (!should_stop) { int fd; FILE *fp; time_t now; @@ -45,7 +44,9 @@ void StatsThread::do_work() fp = fdopen(fd, "w"); if (fp == NULL) { log_perror("fdopen"); - safe_close(fd); + if (close(fd) == -1) { + log_perror("close"); + } if (unlink(filename) == -1) { log_perror(filename); } @@ -84,12 +85,25 @@ void StatsThread::do_work() free(filename); sleep: - // Wait until we are asked to quit, stats_interval timeout, + // Wait until the stop_fd pipe is closed, stats_interval timeout, // or a spurious signal. (The latter will cause us to write stats // too often, but that's okay.) - timespec timeout_ts; - timeout_ts.tv_sec = stats_interval; - timeout_ts.tv_nsec = 0; - wait_for_wakeup(&timeout_ts); + pollfd pfd; + pfd.fd = stop_fd_read; + pfd.events = POLLIN | POLLRDHUP; + + int nfds = poll(&pfd, 1, stats_interval * 1000); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + continue; + } + if (nfds == 1) { + // Should stop. + break; + } + if (nfds == -1) { + log_perror("poll"); + usleep(100000); + continue; + } } } diff --git a/stream.cpp b/stream.cpp index ab08a0f..83257e9 100644 --- a/stream.cpp +++ b/stream.cpp @@ -31,7 +31,13 @@ Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding) Stream::~Stream() { if (data_fd != -1) { - safe_close(data_fd); + int ret; + do { + ret = close(data_fd); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + log_perror("close"); + } } } diff --git a/thread.cpp b/thread.cpp index bd4cfd1..f719eac 100644 --- a/thread.cpp +++ b/thread.cpp @@ -1,97 +1,71 @@ -#include -#include -#include -#include -#include #include #include #include +#include +#include +#include #include "log.h" -#include "mutexlock.h" #include "thread.h" - + Thread::~Thread() {} void Thread::run() { - pthread_mutex_init(&should_stop_mutex, NULL); - should_stop_status = false; + should_stop = false; + int pipefd[2]; + if (pipe2(pipefd, O_CLOEXEC) == -1) { + log_perror("pipe"); + exit(1); + } + stop_fd_read = pipefd[0]; + stop_fd_write = pipefd[1]; pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this); } void Thread::stop() { - { - MutexLock lock(&should_stop_mutex); - should_stop_status = true; - } - wakeup(); - if (pthread_join(worker_thread, NULL) == -1) { - log_perror("pthread_join"); + should_stop = true; + char ch = 0; + int err; + do { + err = write(stop_fd_write, &ch, 1); + } while (err == -1 && errno == EINTR); + + if (err == -1) { + log_perror("write"); exit(1); } -} -void *Thread::do_work_thunk(void *arg) -{ - Thread *thread = reinterpret_cast(arg); + do { + err = close(stop_fd_write); + } while (err == -1 && errno == EINTR); - // Block SIGHUP; only the main thread should see that. - // (This isn't strictly required, but it makes it easier to debug that indeed - // SIGUSR1 was what woke us up.) - sigset_t set; - sigaddset(&set, SIGHUP); - int err = pthread_sigmask(SIG_BLOCK, &set, NULL); - if (err != 0) { - errno = err; - log_perror("pthread_sigmask"); - exit(1); + if (err == -1) { + log_perror("close"); + // Can continue (we have close-on-exec). } - // Block SIGUSR1, and store the old signal mask. - sigemptyset(&set); - sigaddset(&set, SIGUSR1); - err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block); - if (err != 0) { - errno = err; - log_perror("pthread_sigmask"); + pthread_kill(worker_thread, SIGHUP); + if (pthread_join(worker_thread, NULL) == -1) { + log_perror("pthread_join"); exit(1); } + + do { + err = close(stop_fd_read); + } while (err == -1 && errno == EINTR); - // Call the right thunk. - thread->do_work(); - return NULL; -} - -bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts) -{ - pollfd pfd; - pfd.fd = fd; - pfd.events = events; - - for ( ;; ) { - int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block); - if (nfds == -1 && errno == EINTR) { - return false; - } - if (nfds == -1) { - log_perror("poll"); - usleep(100000); - continue; - } - assert(nfds <= 1); - return (nfds == 1); + if (err == -1) { + log_perror("close"); + // Can continue (we have close-on-exec). } } -void Thread::wakeup() +void *Thread::do_work_thunk(void *arg) { - pthread_kill(worker_thread, SIGUSR1); + Thread *thread = reinterpret_cast(arg); + thread->do_work(); + return NULL; } -bool Thread::should_stop() -{ - MutexLock lock(&should_stop_mutex); - return should_stop_status; -} diff --git a/thread.h b/thread.h index 12aa133..26e648e 100644 --- a/thread.h +++ b/thread.h @@ -1,19 +1,12 @@ #ifndef _THREAD_H #define _THREAD_H -#include #include -struct timespec; - -// A thread class with start/stop and signal functionality. -// -// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed -// to receive it when doing wait_for_activity(), and never elsewhere. This means -// that you can test whatever status flags you'd want before calling -// wait_for_activity(), and then be sure that it actually returns immediately -// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test -// and the wait_for_activity() call. +// A rather generic thread class with start/stop functionality. +// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls), +// since signals don't stick. We'll need to figure out something more +// intelligent later, probably based on sending a signal to an fd. class Thread { public: @@ -22,38 +15,21 @@ public: void stop(); protected: - // Recovers the this pointer, blocks SIGUSR1, and calls do_work(). + // Recovers the this pointer, and calls do_work(). static void *do_work_thunk(void *arg); virtual void do_work() = 0; - // Waits until there is activity of the given type on (or an error), - // or until a wakeup. Returns true if there was actually activity on - // the file descriptor. - // - // If fd is -1, wait until a wakeup or timeout. - // if timeout_ts is NULL, there is no timeout. - bool wait_for_activity(int fd, short events, const timespec *timeout_ts); - - // Wait until a wakeup. - void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); } - - // Make wait_for_activity() return. - void wakeup(); + volatile bool should_stop; - bool should_stop(); - - // The signal set as it were before we blocked SIGUSR1. - sigset_t sigset_without_usr1_block; + // A pipe that you can poll on if you want to see when should_stop + // has been set to true; stop() will write a single byte to the pipe + // and then close the other end. + int stop_fd_read; private: pthread_t worker_thread; - - // Protects should_stop_status. - pthread_mutex_t should_stop_mutex; - - // If this is set, the thread should return as soon as possible from do_work(). - bool should_stop_status; + int stop_fd_write; }; #endif // !defined(_THREAD_H) diff --git a/udpinput.cpp b/udpinput.cpp index 07bf63d..4222515 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -12,7 +12,6 @@ #include "serverpool.h" #include "state.pb.h" #include "udpinput.h" -#include "util.h" #include "version.h" using namespace std; @@ -53,7 +52,15 @@ InputProto UDPInput::serialize() const void UDPInput::close_socket() { - safe_close(sock); + int ret; + do { + ret = close(sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close()"); + } + sock = -1; } @@ -75,7 +82,7 @@ void UDPInput::add_destination(const string &stream_id) void UDPInput::do_work() { - while (!should_stop()) { + while (!should_stop) { if (sock == -1) { int port_num = atoi(port.c_str()); sock = create_server_socket(port_num, UDP_SOCKET); @@ -87,12 +94,21 @@ void UDPInput::do_work() } } - // Wait for a packet, or a wakeup. - bool activity = wait_for_activity(sock, POLLIN, NULL); - if (!activity) { - // Most likely, should_stop was set. + // Since we are non-blocking, we need to wait for the right state first. + // Wait up to 50 ms, then check should_stop. + pollfd pfd; + pfd.fd = sock; + pfd.events = POLLIN; + + int nfds = poll(&pfd, 1, 50); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { continue; } + if (nfds == -1) { + log_perror("poll"); + close_socket(); + continue; + } char buf[4096]; int ret; diff --git a/util.cpp b/util.cpp index 2bbffdc..d6d39f2 100644 --- a/util.cpp +++ b/util.cpp @@ -33,7 +33,9 @@ int make_tempfile(const std::string &contents) ssize_t ret = write(fd, ptr, to_write); if (ret == -1) { log_perror("write"); - safe_close(fd); + if (close(fd) == -1) { + log_perror("close"); + } return -1; } @@ -47,7 +49,17 @@ int make_tempfile(const std::string &contents) bool read_tempfile_and_close(int fd, std::string *contents) { bool ok = read_tempfile(fd, contents); - safe_close(fd); // Implicitly deletes the file. + + int ret; + do { + ret = close(fd); // Implicitly deletes the file. + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + // Can still continue. + } + return ok; } @@ -84,17 +96,3 @@ bool read_tempfile(int fd, std::string *contents) return true; } - -int safe_close(int fd) -{ - int ret; - do { - ret = close(fd); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close()"); - } - - return ret; -} diff --git a/util.h b/util.h index d167075..303de14 100644 --- a/util.h +++ b/util.h @@ -15,8 +15,4 @@ bool read_tempfile_and_close(int fd, std::string *contents); // Same as read_tempfile_and_close(), without the close. bool read_tempfile(int fd, std::string *contents); -// Close a file descriptor, taking care of EINTR on the way. -// log_perror() if it fails; apart from that, behaves as close(). -int safe_close(int fd); - #endif // !defined(_UTIL_H