#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
+#include "util.h"
using namespace std;
extern ServerPool *servers;
-extern volatile bool hupped;
int create_server_socket(int port, SocketType socket_type)
{
void Acceptor::close_socket()
{
- int ret;
- do {
- ret = close(server_sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(server_sock);
}
void Acceptor::do_work()
{
- while (!hupped) {
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check hupped.
- pollfd pfd;
- pfd.fd = server_sock;
- pfd.events = POLLIN;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
+ while (!should_stop()) {
+ if (!wait_for_activity(server_sock, POLLIN, NULL)) {
continue;
}
}
}
- while (!should_stop) {
+ while (!should_stop()) {
// Empty the queue.
vector<ClientStats> writes;
{
}
fflush(logfp);
}
-
- // Wait until the stop_fd pipe is closed, one second has passed.
- // or a spurious signal arrives.
- pollfd pfd;
- pfd.fd = stop_fd_read;
- pfd.events = POLLIN | POLLRDHUP;
- int nfds = poll(&pfd, 1, 1000);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == 1) {
- // Should stop.
- break;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
- continue;
- }
+ // Wait until we are being woken up, either to quit or because
+ // there is material in pending_writes.
+ wait_for_wakeup(NULL);
}
if (logfp != NULL) {
+#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <netdb.h>
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
+#include "util.h"
#include "version.h"
using namespace std;
void HTTPInput::close_socket()
{
- int ret;
- do {
- ret = close(sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close()");
- }
+ safe_close(sock);
}
InputProto HTTPInput::serialize() const
addrinfo *base_ai = ai;
// Connect to everything in turn until we have a socket.
- while (ai && !should_stop) {
+ while (ai && !should_stop()) {
int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
// Could be e.g. EPROTONOSUPPORT. The show must go on.
continue;
}
+ // Now do a non-blocking connect. This is important because we want to be able to be
+ // woken up, even though it's rather cumbersome.
+
+ // Set the socket as nonblocking.
+ int one = 1;
+ if (ioctl(sock, FIONBIO, &one) == -1) {
+ log_perror("ioctl(FIONBIO)");
+ safe_close(sock);
+ return -1;
+ }
+
+ // Do a non-blocking connect.
do {
err = connect(sock, ai->ai_addr, ai->ai_addrlen);
} while (err == -1 && errno == EINTR);
- if (err != -1) {
- freeaddrinfo(base_ai);
- return sock;
+ if (err == -1 && errno != EINPROGRESS) {
+ log_perror("connect");
+ safe_close(sock);
+ continue;
}
- do {
- err = close(sock);
- } while (err == -1 && errno == EINTR);
+ // Wait for the connect to complete, or an error to happen.
+ for ( ;; ) {
+ bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
+ if (should_stop()) {
+ safe_close(sock);
+ return -1;
+ }
+ if (complete) {
+ break;
+ }
+ }
- if (err == -1) {
- log_perror("close");
- // Can still continue.
+ // Check whether it ended in an error or not.
+ socklen_t err_size = sizeof(err);
+ if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
+ log_perror("getsockopt");
+ safe_close(sock);
+ continue;
+ }
+
+ errno = err;
+
+ if (err == 0) {
+ // Successful connect.
+ freeaddrinfo(base_ai);
+ return sock;
}
+ safe_close(sock);
ai = ai->ai_next;
}
void HTTPInput::do_work()
{
- while (!should_stop) {
+ while (!should_stop()) {
if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check should_stop.
- pollfd pfd;
- pfd.fd = sock;
- pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
- pfd.events |= POLLRDHUP;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
+ if (!activity) {
+ // Most likely, should_stop was set.
continue;
}
- if (nfds == -1) {
- log_perror("poll");
- state = CLOSING_SOCKET;
- }
}
switch (state) {
break;
}
case CLOSING_SOCKET: {
- int err;
- do {
- err = close(sock);
- } while (err == -1 && errno == EINTR);
-
- if (err == -1) {
- log_perror("close");
- }
-
+ close_socket();
state = NOT_CONNECTED;
break;
}
// If we are still in NOT_CONNECTED, either something went wrong,
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
- if (state == NOT_CONNECTED && !should_stop) {
+ if (state == NOT_CONNECTED && !should_stop()) {
log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
- usleep(200000);
+ timespec timeout_ts;
+ timeout_ts.tv_sec = 0;
+ timeout_ts.tv_nsec = 200000000;
+ wait_for_wakeup(&timeout_ts);
}
}
}
}
}
+void do_nothing(int signum)
+{
+}
+
CubemapStateProto collect_state(const timeval &serialize_start,
const vector<Acceptor *> acceptors,
const multimap<string, InputWithRefcount> inputs,
{
signal(SIGHUP, hup);
signal(SIGINT, hup);
+ signal(SIGUSR1, do_nothing); // Used in internal signalling.
signal(SIGPIPE, SIG_IGN);
// Parse options.
#include "server.h"
#include "state.pb.h"
#include "stream.h"
+#include "util.h"
using namespace std;
delete stream_it->second;
}
- int ret;
- do {
- ret = close(epoll_fd);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close(epoll_fd)");
- }
+ safe_close(epoll_fd);
}
vector<ClientStats> Server::get_client_stats() const
void Server::do_work()
{
- for ( ;; ) {
- int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
- if (nfds == -1 && errno == EINTR) {
- if (should_stop) {
- return;
- }
- continue;
- }
- if (nfds == -1) {
+ while (!should_stop()) {
+ // Wait until there's activity on at least one of the fds,
+ // or 20 ms (about one frame at 50 fps) has elapsed.
+ //
+ // We could in theory wait forever and rely on wakeup()
+ // from add_client_deferred() and add_data_deferred(),
+ // but wakeup is a pretty expensive operation, and the
+ // two threads might end up fighting over a lock, so it's
+ // seemingly (much) more efficient to just have a timeout here.
+ int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS, &sigset_without_usr1_block);
+ if (nfds == -1 && errno != EINTR) {
log_perror("epoll_wait");
exit(1);
}
process_client(to_process[i]);
}
}
-
- if (should_stop) {
- return;
- }
}
}
access_log->write(client->get_stats());
// Bye-bye!
- int ret;
- do {
- ret = close(client->sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(client->sock);
clients.erase(client->sock);
}
// Close and delete any leftovers, if the number of servers was reduced.
for (size_t i = num_servers; i < data_fds.size(); ++i) {
- int ret;
- do {
- ret = close(data_fds[i]); // Implicitly deletes the file.
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- // Can still continue.
- }
+ safe_close(data_fds[i]); // Implicitly deletes the file.
}
}
#include "log.h"
#include "serverpool.h"
#include "stats.h"
+#include "util.h"
using namespace std;
void StatsThread::do_work()
{
- while (!should_stop) {
+ while (!should_stop()) {
int fd;
FILE *fp;
time_t now;
fp = fdopen(fd, "w");
if (fp == NULL) {
log_perror("fdopen");
- if (close(fd) == -1) {
- log_perror("close");
- }
+ safe_close(fd);
if (unlink(filename) == -1) {
log_perror(filename);
}
free(filename);
sleep:
- // Wait until the stop_fd pipe is closed, stats_interval timeout,
+ // Wait until we are asked to quit, stats_interval timeout,
// or a spurious signal. (The latter will cause us to write stats
// too often, but that's okay.)
- pollfd pfd;
- pfd.fd = stop_fd_read;
- pfd.events = POLLIN | POLLRDHUP;
-
- int nfds = poll(&pfd, 1, stats_interval * 1000);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == 1) {
- // Should stop.
- break;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
- continue;
- }
+ timespec timeout_ts;
+ timeout_ts.tv_sec = stats_interval;
+ timeout_ts.tv_nsec = 0;
+ wait_for_wakeup(&timeout_ts);
}
}
Stream::~Stream()
{
if (data_fd != -1) {
- int ret;
- do {
- ret = close(data_fd);
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(data_fd);
}
}
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <errno.h>
#include "log.h"
+#include "mutexlock.h"
#include "thread.h"
-
+
Thread::~Thread() {}
void Thread::run()
{
- should_stop = false;
- int pipefd[2];
- if (pipe2(pipefd, O_CLOEXEC) == -1) {
- log_perror("pipe");
- exit(1);
- }
- stop_fd_read = pipefd[0];
- stop_fd_write = pipefd[1];
+ pthread_mutex_init(&should_stop_mutex, NULL);
+ should_stop_status = false;
pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
}
void Thread::stop()
{
- should_stop = true;
- char ch = 0;
- int err;
- do {
- err = write(stop_fd_write, &ch, 1);
- } while (err == -1 && errno == EINTR);
-
- if (err == -1) {
- log_perror("write");
+ {
+ MutexLock lock(&should_stop_mutex);
+ should_stop_status = true;
+ }
+ wakeup();
+ if (pthread_join(worker_thread, NULL) == -1) {
+ log_perror("pthread_join");
exit(1);
}
+}
- do {
- err = close(stop_fd_write);
- } while (err == -1 && errno == EINTR);
+void *Thread::do_work_thunk(void *arg)
+{
+ Thread *thread = reinterpret_cast<Thread *>(arg);
- if (err == -1) {
- log_perror("close");
- // Can continue (we have close-on-exec).
+ // Block SIGHUP; only the main thread should see that.
+ // (This isn't strictly required, but it makes it easier to debug that indeed
+ // SIGUSR1 was what woke us up.)
+ sigset_t set;
+ sigaddset(&set, SIGHUP);
+ int err = pthread_sigmask(SIG_BLOCK, &set, NULL);
+ if (err != 0) {
+ errno = err;
+ log_perror("pthread_sigmask");
+ exit(1);
}
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- log_perror("pthread_join");
+ // Block SIGUSR1, and store the old signal mask.
+ sigemptyset(&set);
+ sigaddset(&set, SIGUSR1);
+ err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block);
+ if (err != 0) {
+ errno = err;
+ log_perror("pthread_sigmask");
exit(1);
}
-
- do {
- err = close(stop_fd_read);
- } while (err == -1 && errno == EINTR);
- if (err == -1) {
- log_perror("close");
- // Can continue (we have close-on-exec).
+ // Call the right thunk.
+ thread->do_work();
+ return NULL;
+}
+
+bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts)
+{
+ pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = events;
+
+ for ( ;; ) {
+ int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block);
+ if (nfds == -1 && errno == EINTR) {
+ return false;
+ }
+ if (nfds == -1) {
+ log_perror("poll");
+ usleep(100000);
+ continue;
+ }
+ assert(nfds <= 1);
+ return (nfds == 1);
}
}
-void *Thread::do_work_thunk(void *arg)
+void Thread::wakeup()
{
- Thread *thread = reinterpret_cast<Thread *>(arg);
- thread->do_work();
- return NULL;
+ pthread_kill(worker_thread, SIGUSR1);
}
+bool Thread::should_stop()
+{
+ MutexLock lock(&should_stop_mutex);
+ return should_stop_status;
+}
#ifndef _THREAD_H
#define _THREAD_H
+#include <signal.h>
#include <pthread.h>
-// A rather generic thread class with start/stop functionality.
-// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
-// since signals don't stick. We'll need to figure out something more
-// intelligent later, probably based on sending a signal to an fd.
+struct timespec;
+
+// A thread class with start/stop and signal functionality.
+//
+// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed
+// to receive it when doing wait_for_activity(), and never elsewhere. This means
+// that you can test whatever status flags you'd want before calling
+// wait_for_activity(), and then be sure that it actually returns immediately
+// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test
+// and the wait_for_activity() call.
class Thread {
public:
void stop();
protected:
- // Recovers the this pointer, and calls do_work().
+ // Recovers the this pointer, blocks SIGUSR1, and calls do_work().
static void *do_work_thunk(void *arg);
virtual void do_work() = 0;
- volatile bool should_stop;
+ // Waits until there is activity of the given type on <fd> (or an error),
+ // or until a wakeup. Returns true if there was actually activity on
+ // the file descriptor.
+ //
+ // If fd is -1, wait until a wakeup or timeout.
+ // if timeout_ts is NULL, there is no timeout.
+ bool wait_for_activity(int fd, short events, const timespec *timeout_ts);
+
+ // Wait until a wakeup.
+ void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); }
+
+ // Make wait_for_activity() return. Note that this is a relatively expensive
+ // operation.
+ void wakeup();
- // A pipe that you can poll on if you want to see when should_stop
- // has been set to true; stop() will write a single byte to the pipe
- // and then close the other end.
- int stop_fd_read;
+ bool should_stop();
+
+ // The signal set as it were before we blocked SIGUSR1.
+ sigset_t sigset_without_usr1_block;
private:
pthread_t worker_thread;
- int stop_fd_write;
+
+ // Protects should_stop_status.
+ pthread_mutex_t should_stop_mutex;
+
+ // If this is set, the thread should return as soon as possible from do_work().
+ bool should_stop_status;
};
#endif // !defined(_THREAD_H)
#include "serverpool.h"
#include "state.pb.h"
#include "udpinput.h"
+#include "util.h"
#include "version.h"
using namespace std;
void UDPInput::close_socket()
{
- int ret;
- do {
- ret = close(sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close()");
- }
-
+ safe_close(sock);
sock = -1;
}
void UDPInput::do_work()
{
- while (!should_stop) {
+ while (!should_stop()) {
if (sock == -1) {
int port_num = atoi(port.c_str());
sock = create_server_socket(port_num, UDP_SOCKET);
}
}
- // Since we are non-blocking, we need to wait for the right state first.
- // Wait up to 50 ms, then check should_stop.
- pollfd pfd;
- pfd.fd = sock;
- pfd.events = POLLIN;
-
- int nfds = poll(&pfd, 1, 50);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ // Wait for a packet, or a wakeup.
+ bool activity = wait_for_activity(sock, POLLIN, NULL);
+ if (!activity) {
+ // Most likely, should_stop was set.
continue;
}
- if (nfds == -1) {
- log_perror("poll");
- close_socket();
- continue;
- }
char buf[4096];
int ret;
ssize_t ret = write(fd, ptr, to_write);
if (ret == -1) {
log_perror("write");
- if (close(fd) == -1) {
- log_perror("close");
- }
+ safe_close(fd);
return -1;
}
bool read_tempfile_and_close(int fd, std::string *contents)
{
bool ok = read_tempfile(fd, contents);
-
- int ret;
- do {
- ret = close(fd); // Implicitly deletes the file.
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- // Can still continue.
- }
-
+ safe_close(fd); // Implicitly deletes the file.
return ok;
}
return true;
}
+
+int safe_close(int fd)
+{
+ int ret;
+ do {
+ ret = close(fd);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close()");
+ }
+
+ return ret;
+}
// Same as read_tempfile_and_close(), without the close.
bool read_tempfile(int fd, std::string *contents);
+// Close a file descriptor, taking care of EINTR on the way.
+// log_perror() if it fails; apart from that, behaves as close().
+int safe_close(int fd);
+
#endif // !defined(_UTIL_H