We now solve things in a much less racy way, sending SIGUSR1 internally,
which is blocked except in poll/epoll. This should be more stable,
require no more fixed-length wakeups (no more wakeup every 20 ms in
each server), and upset Valgrind less.
However, it's also complex to get right, so it might introduce new bugs.
14 files changed:
#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
using namespace std;
extern ServerPool *servers;
using namespace std;
extern ServerPool *servers;
-extern volatile bool hupped;
int create_server_socket(int port, SocketType socket_type)
{
int create_server_socket(int port, SocketType socket_type)
{
void Acceptor::close_socket()
{
void Acceptor::close_socket()
{
- int ret;
- do {
- ret = close(server_sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(server_sock);
}
void Acceptor::do_work()
{
}
void Acceptor::do_work()
{
- while (!hupped) {
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check hupped.
- pollfd pfd;
- pfd.fd = server_sock;
- pfd.events = POLLIN;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
+ while (!should_stop()) {
+ if (!wait_for_activity(server_sock, POLLIN, NULL)) {
+ while (!should_stop()) {
// Empty the queue.
vector<ClientStats> writes;
{
// Empty the queue.
vector<ClientStats> writes;
{
-
- // Wait until the stop_fd pipe is closed, one second has passed.
- // or a spurious signal arrives.
- pollfd pfd;
- pfd.fd = stop_fd_read;
- pfd.events = POLLIN | POLLRDHUP;
- int nfds = poll(&pfd, 1, 1000);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == 1) {
- // Should stop.
- break;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
- continue;
- }
+ // Wait until we are being woken up, either to quit or because
+ // there is material in pending_writes.
+ wait_for_wakeup(NULL);
#include <assert.h>
#include <errno.h>
#include <netdb.h>
#include <assert.h>
#include <errno.h>
#include <netdb.h>
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
#include "version.h"
using namespace std;
#include "version.h"
using namespace std;
void HTTPInput::close_socket()
{
void HTTPInput::close_socket()
{
- int ret;
- do {
- ret = close(sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close()");
- }
}
InputProto HTTPInput::serialize() const
}
InputProto HTTPInput::serialize() const
addrinfo *base_ai = ai;
// Connect to everything in turn until we have a socket.
addrinfo *base_ai = ai;
// Connect to everything in turn until we have a socket.
- while (ai && !should_stop) {
+ while (ai && !should_stop()) {
int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
// Could be e.g. EPROTONOSUPPORT. The show must go on.
continue;
}
int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
// Could be e.g. EPROTONOSUPPORT. The show must go on.
continue;
}
+ // Now do a non-blocking connect. This is important because we want to be able to be
+ // woken up, even though it's rather cumbersome.
+
+ // Set the socket as nonblocking.
+ int one = 1;
+ if (ioctl(sock, FIONBIO, &one) == -1) {
+ log_perror("ioctl(FIONBIO)");
+ safe_close(sock);
+ return -1;
+ }
+
+ // Do a non-blocking connect.
do {
err = connect(sock, ai->ai_addr, ai->ai_addrlen);
} while (err == -1 && errno == EINTR);
do {
err = connect(sock, ai->ai_addr, ai->ai_addrlen);
} while (err == -1 && errno == EINTR);
- if (err != -1) {
- freeaddrinfo(base_ai);
- return sock;
+ if (err == -1 && errno != EINPROGRESS) {
+ log_perror("connect");
+ safe_close(sock);
+ continue;
- do {
- err = close(sock);
- } while (err == -1 && errno == EINTR);
+ // Wait for the connect to complete, or an error to happen.
+ for ( ;; ) {
+ bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
+ if (should_stop()) {
+ safe_close(sock);
+ return -1;
+ }
+ if (complete) {
+ break;
+ }
+ }
- if (err == -1) {
- log_perror("close");
- // Can still continue.
+ // Check whether it ended in an error or not.
+ socklen_t err_size = sizeof(err);
+ if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
+ log_perror("getsockopt");
+ safe_close(sock);
+ continue;
+ }
+
+ errno = err;
+
+ if (err == 0) {
+ // Successful connect.
+ freeaddrinfo(base_ai);
+ return sock;
void HTTPInput::do_work()
{
void HTTPInput::do_work()
{
+ while (!should_stop()) {
if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check should_stop.
- pollfd pfd;
- pfd.fd = sock;
- pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
- pfd.events |= POLLRDHUP;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
+ if (!activity) {
+ // Most likely, should_stop was set.
- if (nfds == -1) {
- log_perror("poll");
- state = CLOSING_SOCKET;
- }
break;
}
case CLOSING_SOCKET: {
break;
}
case CLOSING_SOCKET: {
- int err;
- do {
- err = close(sock);
- } while (err == -1 && errno == EINTR);
-
- if (err == -1) {
- log_perror("close");
- }
-
state = NOT_CONNECTED;
break;
}
state = NOT_CONNECTED;
break;
}
// If we are still in NOT_CONNECTED, either something went wrong,
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
// If we are still in NOT_CONNECTED, either something went wrong,
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
- if (state == NOT_CONNECTED && !should_stop) {
+ if (state == NOT_CONNECTED && !should_stop()) {
log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
+ timespec timeout_ts;
+ timeout_ts.tv_sec = 0;
+ timeout_ts.tv_nsec = 200000000;
+ wait_for_wakeup(&timeout_ts);
+void do_nothing(int signum)
+{
+}
+
CubemapStateProto collect_state(const timeval &serialize_start,
const vector<Acceptor *> acceptors,
const multimap<string, InputWithRefcount> inputs,
CubemapStateProto collect_state(const timeval &serialize_start,
const vector<Acceptor *> acceptors,
const multimap<string, InputWithRefcount> inputs,
{
signal(SIGHUP, hup);
signal(SIGINT, hup);
{
signal(SIGHUP, hup);
signal(SIGINT, hup);
+ signal(SIGUSR1, do_nothing); // Used in internal signalling.
signal(SIGPIPE, SIG_IGN);
// Parse options.
signal(SIGPIPE, SIG_IGN);
// Parse options.
#include "server.h"
#include "state.pb.h"
#include "stream.h"
#include "server.h"
#include "state.pb.h"
#include "stream.h"
delete stream_it->second;
}
delete stream_it->second;
}
- int ret;
- do {
- ret = close(epoll_fd);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close(epoll_fd)");
- }
}
vector<ClientStats> Server::get_client_stats() const
}
vector<ClientStats> Server::get_client_stats() const
- for ( ;; ) {
- int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
- if (nfds == -1 && errno == EINTR) {
- if (should_stop) {
- return;
- }
- continue;
- }
- if (nfds == -1) {
+ while (!should_stop()) {
+ // Wait until there's activity on at least one of the fds,
+ // or we are waken up due to new queued clients or data.
+ int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block);
+ if (nfds == -1 && errno != EINTR) {
log_perror("epoll_wait");
exit(1);
}
log_perror("epoll_wait");
exit(1);
}
process_client(to_process[i]);
}
}
process_client(to_process[i]);
}
}
-
- if (should_stop) {
- return;
- }
{
MutexLock lock(&queued_data_mutex);
queued_add_clients.push_back(sock);
{
MutexLock lock(&queued_data_mutex);
queued_add_clients.push_back(sock);
}
void Server::add_client(int sock)
}
void Server::add_client(int sock)
{
MutexLock lock(&queued_data_mutex);
queued_data[stream_id].append(string(data, data + bytes));
{
MutexLock lock(&queued_data_mutex);
queued_data[stream_id].append(string(data, data + bytes));
}
// See the .h file for postconditions after this function.
}
// See the .h file for postconditions after this function.
access_log->write(client->get_stats());
// Bye-bye!
access_log->write(client->get_stats());
// Bye-bye!
- int ret;
- do {
- ret = close(client->sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(client->sock);
clients.erase(client->sock);
}
clients.erase(client->sock);
}
struct Stream;
#define EPOLL_MAX_EVENTS 8192
struct Stream;
#define EPOLL_MAX_EVENTS 8192
-#define EPOLL_TIMEOUT_MS 20
#define MAX_CLIENT_REQUEST 16384
class CubemapStateProto;
#define MAX_CLIENT_REQUEST 16384
class CubemapStateProto;
// Close and delete any leftovers, if the number of servers was reduced.
for (size_t i = num_servers; i < data_fds.size(); ++i) {
// Close and delete any leftovers, if the number of servers was reduced.
for (size_t i = num_servers; i < data_fds.size(); ++i) {
- int ret;
- do {
- ret = close(data_fds[i]); // Implicitly deletes the file.
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- // Can still continue.
- }
+ safe_close(data_fds[i]); // Implicitly deletes the file.
#include "log.h"
#include "serverpool.h"
#include "stats.h"
#include "log.h"
#include "serverpool.h"
#include "stats.h"
void StatsThread::do_work()
{
void StatsThread::do_work()
{
+ while (!should_stop()) {
int fd;
FILE *fp;
time_t now;
int fd;
FILE *fp;
time_t now;
fp = fdopen(fd, "w");
if (fp == NULL) {
log_perror("fdopen");
fp = fdopen(fd, "w");
if (fp == NULL) {
log_perror("fdopen");
- if (close(fd) == -1) {
- log_perror("close");
- }
if (unlink(filename) == -1) {
log_perror(filename);
}
if (unlink(filename) == -1) {
log_perror(filename);
}
- // Wait until the stop_fd pipe is closed, stats_interval timeout,
+ // Wait until we are asked to quit, stats_interval timeout,
// or a spurious signal. (The latter will cause us to write stats
// too often, but that's okay.)
// or a spurious signal. (The latter will cause us to write stats
// too often, but that's okay.)
- pollfd pfd;
- pfd.fd = stop_fd_read;
- pfd.events = POLLIN | POLLRDHUP;
-
- int nfds = poll(&pfd, 1, stats_interval * 1000);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == 1) {
- // Should stop.
- break;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
- continue;
- }
+ timespec timeout_ts;
+ timeout_ts.tv_sec = stats_interval;
+ timeout_ts.tv_nsec = 0;
+ wait_for_wakeup(&timeout_ts);
Stream::~Stream()
{
if (data_fd != -1) {
Stream::~Stream()
{
if (data_fd != -1) {
- int ret;
- do {
- ret = close(data_fd);
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- log_perror("close");
- }
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <errno.h>
Thread::~Thread() {}
void Thread::run()
{
Thread::~Thread() {}
void Thread::run()
{
- should_stop = false;
- int pipefd[2];
- if (pipe2(pipefd, O_CLOEXEC) == -1) {
- log_perror("pipe");
- exit(1);
- }
- stop_fd_read = pipefd[0];
- stop_fd_write = pipefd[1];
+ pthread_mutex_init(&should_stop_mutex, NULL);
+ should_stop_status = false;
pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
}
void Thread::stop()
{
pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
}
void Thread::stop()
{
- should_stop = true;
- char ch = 0;
- int err;
- do {
- err = write(stop_fd_write, &ch, 1);
- } while (err == -1 && errno == EINTR);
-
- if (err == -1) {
- log_perror("write");
+ {
+ MutexLock lock(&should_stop_mutex);
+ should_stop_status = true;
+ }
+ wakeup();
+ if (pthread_join(worker_thread, NULL) == -1) {
+ log_perror("pthread_join");
- do {
- err = close(stop_fd_write);
- } while (err == -1 && errno == EINTR);
+void *Thread::do_work_thunk(void *arg)
+{
+ Thread *thread = reinterpret_cast<Thread *>(arg);
- if (err == -1) {
- log_perror("close");
- // Can continue (we have close-on-exec).
+ // Block SIGHUP; only the main thread should see that.
+ // (This isn't strictly required, but it makes it easier to debug that indeed
+ // SIGUSR1 was what woke us up.)
+ sigset_t set;
+ sigaddset(&set, SIGHUP);
+ int err = pthread_sigmask(SIG_BLOCK, &set, NULL);
+ if (err != 0) {
+ errno = err;
+ log_perror("pthread_sigmask");
+ exit(1);
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- log_perror("pthread_join");
+ // Block SIGUSR1, and store the old signal mask.
+ sigemptyset(&set);
+ sigaddset(&set, SIGUSR1);
+ err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block);
+ if (err != 0) {
+ errno = err;
+ log_perror("pthread_sigmask");
-
- do {
- err = close(stop_fd_read);
- } while (err == -1 && errno == EINTR);
- if (err == -1) {
- log_perror("close");
- // Can continue (we have close-on-exec).
+ // Call the right thunk.
+ thread->do_work();
+ return NULL;
+}
+
+bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts)
+{
+ pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = events;
+
+ for ( ;; ) {
+ int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block);
+ if (nfds == -1 && errno == EINTR) {
+ return false;
+ }
+ if (nfds == -1) {
+ log_perror("poll");
+ usleep(100000);
+ continue;
+ }
+ assert(nfds <= 1);
+ return (nfds == 1);
-void *Thread::do_work_thunk(void *arg)
- Thread *thread = reinterpret_cast<Thread *>(arg);
- thread->do_work();
- return NULL;
+ pthread_kill(worker_thread, SIGUSR1);
+bool Thread::should_stop()
+{
+ MutexLock lock(&should_stop_mutex);
+ return should_stop_status;
+}
#ifndef _THREAD_H
#define _THREAD_H
#ifndef _THREAD_H
#define _THREAD_H
-// A rather generic thread class with start/stop functionality.
-// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
-// since signals don't stick. We'll need to figure out something more
-// intelligent later, probably based on sending a signal to an fd.
+struct timespec;
+
+// A thread class with start/stop and signal functionality.
+//
+// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed
+// to receive it when doing wait_for_activity(), and never elsewhere. This means
+// that you can test whatever status flags you'd want before calling
+// wait_for_activity(), and then be sure that it actually returns immediately
+// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test
+// and the wait_for_activity() call.
- // Recovers the this pointer, and calls do_work().
+ // Recovers the this pointer, blocks SIGUSR1, and calls do_work().
static void *do_work_thunk(void *arg);
virtual void do_work() = 0;
static void *do_work_thunk(void *arg);
virtual void do_work() = 0;
- volatile bool should_stop;
+ // Waits until there is activity of the given type on <fd> (or an error),
+ // or until a wakeup. Returns true if there was actually activity on
+ // the file descriptor.
+ //
+ // If fd is -1, wait until a wakeup or timeout.
+ // if timeout_ts is NULL, there is no timeout.
+ bool wait_for_activity(int fd, short events, const timespec *timeout_ts);
+
+ // Wait until a wakeup.
+ void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); }
+
+ // Make wait_for_activity() return.
+ void wakeup();
- // A pipe that you can poll on if you want to see when should_stop
- // has been set to true; stop() will write a single byte to the pipe
- // and then close the other end.
- int stop_fd_read;
+ bool should_stop();
+
+ // The signal set as it were before we blocked SIGUSR1.
+ sigset_t sigset_without_usr1_block;
private:
pthread_t worker_thread;
private:
pthread_t worker_thread;
+
+ // Protects should_stop_status.
+ pthread_mutex_t should_stop_mutex;
+
+ // If this is set, the thread should return as soon as possible from do_work().
+ bool should_stop_status;
};
#endif // !defined(_THREAD_H)
};
#endif // !defined(_THREAD_H)
#include "serverpool.h"
#include "state.pb.h"
#include "udpinput.h"
#include "serverpool.h"
#include "state.pb.h"
#include "udpinput.h"
#include "version.h"
using namespace std;
#include "version.h"
using namespace std;
void UDPInput::close_socket()
{
void UDPInput::close_socket()
{
- int ret;
- do {
- ret = close(sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close()");
- }
-
void UDPInput::do_work()
{
void UDPInput::do_work()
{
+ while (!should_stop()) {
if (sock == -1) {
int port_num = atoi(port.c_str());
sock = create_server_socket(port_num, UDP_SOCKET);
if (sock == -1) {
int port_num = atoi(port.c_str());
sock = create_server_socket(port_num, UDP_SOCKET);
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check should_stop.
- pollfd pfd;
- pfd.fd = sock;
- pfd.events = POLLIN;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ // Wait for a packet, or a wakeup.
+ bool activity = wait_for_activity(sock, POLLIN, NULL);
+ if (!activity) {
+ // Most likely, should_stop was set.
- if (nfds == -1) {
- log_perror("poll");
- close_socket();
- continue;
- }
ssize_t ret = write(fd, ptr, to_write);
if (ret == -1) {
log_perror("write");
ssize_t ret = write(fd, ptr, to_write);
if (ret == -1) {
log_perror("write");
- if (close(fd) == -1) {
- log_perror("close");
- }
bool read_tempfile_and_close(int fd, std::string *contents)
{
bool ok = read_tempfile(fd, contents);
bool read_tempfile_and_close(int fd, std::string *contents)
{
bool ok = read_tempfile(fd, contents);
-
- int ret;
- do {
- ret = close(fd); // Implicitly deletes the file.
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- // Can still continue.
- }
-
+ safe_close(fd); // Implicitly deletes the file.
+
+int safe_close(int fd)
+{
+ int ret;
+ do {
+ ret = close(fd);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close()");
+ }
+
+ return ret;
+}
// Same as read_tempfile_and_close(), without the close.
bool read_tempfile(int fd, std::string *contents);
// Same as read_tempfile_and_close(), without the close.
bool read_tempfile(int fd, std::string *contents);
+// Close a file descriptor, taking care of EINTR on the way.
+// log_perror() if it fails; apart from that, behaves as close().
+int safe_close(int fd);
+
#endif // !defined(_UTIL_H
#endif // !defined(_UTIL_H