#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
-#include "util.h"
using namespace std;
extern ServerPool *servers;
+extern volatile bool hupped;
int create_server_socket(int port, SocketType socket_type)
{
void Acceptor::close_socket()
{
- safe_close(server_sock);
+ int ret;
+ do {
+ ret = close(server_sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close");
+ }
}
void Acceptor::do_work()
{
- while (!should_stop()) {
- if (!wait_for_activity(server_sock, POLLIN, NULL)) {
+ while (!hupped) {
+ // Since we are non-blocking, we need to wait for the right state first.
+ // Wait up to 50 ms, then check hupped.
+ pollfd pfd;
+ pfd.fd = server_sock;
+ pfd.events = POLLIN;
+
+ int nfds = poll(&pfd, 1, 50);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ continue;
+ }
+ if (nfds == -1) {
+ log_perror("poll");
+ usleep(100000);
continue;
}
}
}
- while (!should_stop()) {
+ while (!should_stop) {
// Empty the queue.
vector<ClientStats> writes;
{
}
fflush(logfp);
}
+
+ // Wait until the stop_fd pipe is closed, one second has passed.
+ // or a spurious signal arrives.
+ pollfd pfd;
+ pfd.fd = stop_fd_read;
+ pfd.events = POLLIN | POLLRDHUP;
- // Wait until we are being woken up, either to quit or because
- // there is material in pending_writes.
- wait_for_wakeup(NULL);
+ int nfds = poll(&pfd, 1, 1000);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ continue;
+ }
+ if (nfds == 1) {
+ // Should stop.
+ break;
+ }
+ if (nfds == -1) {
+ log_perror("poll");
+ usleep(100000);
+ continue;
+ }
}
if (logfp != NULL) {
-#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <netdb.h>
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
-#include "util.h"
#include "version.h"
using namespace std;
void HTTPInput::close_socket()
{
- safe_close(sock);
+ int ret;
+ do {
+ ret = close(sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close()");
+ }
}
InputProto HTTPInput::serialize() const
addrinfo *base_ai = ai;
// Connect to everything in turn until we have a socket.
- while (ai && !should_stop()) {
+ while (ai && !should_stop) {
int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
// Could be e.g. EPROTONOSUPPORT. The show must go on.
continue;
}
- // Now do a non-blocking connect. This is important because we want to be able to be
- // woken up, even though it's rather cumbersome.
-
- // Set the socket as nonblocking.
- int one = 1;
- if (ioctl(sock, FIONBIO, &one) == -1) {
- log_perror("ioctl(FIONBIO)");
- safe_close(sock);
- return -1;
- }
-
- // Do a non-blocking connect.
do {
err = connect(sock, ai->ai_addr, ai->ai_addrlen);
} while (err == -1 && errno == EINTR);
- if (err == -1 && errno != EINPROGRESS) {
- log_perror("connect");
- safe_close(sock);
- continue;
- }
-
- // Wait for the connect to complete, or an error to happen.
- for ( ;; ) {
- bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
- if (should_stop()) {
- safe_close(sock);
- return -1;
- }
- if (complete) {
- break;
- }
- }
-
- // Check whether it ended in an error or not.
- socklen_t err_size = sizeof(err);
- if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
- log_perror("getsockopt");
- safe_close(sock);
- continue;
+ if (err != -1) {
+ freeaddrinfo(base_ai);
+ return sock;
}
- errno = err;
+ do {
+ err = close(sock);
+ } while (err == -1 && errno == EINTR);
- if (err == 0) {
- // Successful connect.
- freeaddrinfo(base_ai);
- return sock;
+ if (err == -1) {
+ log_perror("close");
+ // Can still continue.
}
- safe_close(sock);
ai = ai->ai_next;
}
void HTTPInput::do_work()
{
- while (!should_stop()) {
+ while (!should_stop) {
if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
- bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
- if (!activity) {
- // Most likely, should_stop was set.
+ // Since we are non-blocking, we need to wait for the right state first.
+ // Wait up to 50 ms, then check should_stop.
+ pollfd pfd;
+ pfd.fd = sock;
+ pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
+ pfd.events |= POLLRDHUP;
+
+ int nfds = poll(&pfd, 1, 50);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
continue;
}
+ if (nfds == -1) {
+ log_perror("poll");
+ state = CLOSING_SOCKET;
+ }
}
switch (state) {
break;
}
case CLOSING_SOCKET: {
- close_socket();
+ int err;
+ do {
+ err = close(sock);
+ } while (err == -1 && errno == EINTR);
+
+ if (err == -1) {
+ log_perror("close");
+ }
+
state = NOT_CONNECTED;
break;
}
// If we are still in NOT_CONNECTED, either something went wrong,
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
- if (state == NOT_CONNECTED && !should_stop()) {
+ if (state == NOT_CONNECTED && !should_stop) {
log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
- timespec timeout_ts;
- timeout_ts.tv_sec = 0;
- timeout_ts.tv_nsec = 200000000;
- wait_for_wakeup(&timeout_ts);
+ usleep(200000);
}
}
}
}
}
-void do_nothing(int signum)
-{
-}
-
CubemapStateProto collect_state(const timeval &serialize_start,
const vector<Acceptor *> acceptors,
const multimap<string, InputWithRefcount> inputs,
{
signal(SIGHUP, hup);
signal(SIGINT, hup);
- signal(SIGUSR1, do_nothing); // Used in internal signalling.
signal(SIGPIPE, SIG_IGN);
// Parse options.
#include "server.h"
#include "state.pb.h"
#include "stream.h"
-#include "util.h"
using namespace std;
delete stream_it->second;
}
- safe_close(epoll_fd);
+ int ret;
+ do {
+ ret = close(epoll_fd);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close(epoll_fd)");
+ }
}
vector<ClientStats> Server::get_client_stats() const
void Server::do_work()
{
- while (!should_stop()) {
- // Wait until there's activity on at least one of the fds,
- // or we are waken up due to new queued clients or data.
- int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block);
- if (nfds == -1 && errno != EINTR) {
+ for ( ;; ) {
+ int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
+ if (nfds == -1 && errno == EINTR) {
+ if (should_stop) {
+ return;
+ }
+ continue;
+ }
+ if (nfds == -1) {
log_perror("epoll_wait");
exit(1);
}
process_client(to_process[i]);
}
}
+
+ if (should_stop) {
+ return;
+ }
}
}
{
MutexLock lock(&queued_data_mutex);
queued_add_clients.push_back(sock);
- wakeup();
}
void Server::add_client(int sock)
{
MutexLock lock(&queued_data_mutex);
queued_data[stream_id].append(string(data, data + bytes));
- wakeup();
}
// See the .h file for postconditions after this function.
access_log->write(client->get_stats());
// Bye-bye!
- safe_close(client->sock);
+ int ret;
+ do {
+ ret = close(client->sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close");
+ }
clients.erase(client->sock);
}
struct Stream;
#define EPOLL_MAX_EVENTS 8192
+#define EPOLL_TIMEOUT_MS 20
#define MAX_CLIENT_REQUEST 16384
class CubemapStateProto;
// Close and delete any leftovers, if the number of servers was reduced.
for (size_t i = num_servers; i < data_fds.size(); ++i) {
- safe_close(data_fds[i]); // Implicitly deletes the file.
+ int ret;
+ do {
+ ret = close(data_fds[i]); // Implicitly deletes the file.
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close");
+ // Can still continue.
+ }
}
}
#include "log.h"
#include "serverpool.h"
#include "stats.h"
-#include "util.h"
using namespace std;
void StatsThread::do_work()
{
- while (!should_stop()) {
+ while (!should_stop) {
int fd;
FILE *fp;
time_t now;
fp = fdopen(fd, "w");
if (fp == NULL) {
log_perror("fdopen");
- safe_close(fd);
+ if (close(fd) == -1) {
+ log_perror("close");
+ }
if (unlink(filename) == -1) {
log_perror(filename);
}
free(filename);
sleep:
- // Wait until we are asked to quit, stats_interval timeout,
+ // Wait until the stop_fd pipe is closed, stats_interval timeout,
// or a spurious signal. (The latter will cause us to write stats
// too often, but that's okay.)
- timespec timeout_ts;
- timeout_ts.tv_sec = stats_interval;
- timeout_ts.tv_nsec = 0;
- wait_for_wakeup(&timeout_ts);
+ pollfd pfd;
+ pfd.fd = stop_fd_read;
+ pfd.events = POLLIN | POLLRDHUP;
+
+ int nfds = poll(&pfd, 1, stats_interval * 1000);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ continue;
+ }
+ if (nfds == 1) {
+ // Should stop.
+ break;
+ }
+ if (nfds == -1) {
+ log_perror("poll");
+ usleep(100000);
+ continue;
+ }
}
}
Stream::~Stream()
{
if (data_fd != -1) {
- safe_close(data_fd);
+ int ret;
+ do {
+ ret = close(data_fd);
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ log_perror("close");
+ }
}
}
-#include <assert.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <poll.h>
-#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <errno.h>
#include "log.h"
-#include "mutexlock.h"
#include "thread.h"
-
+
Thread::~Thread() {}
void Thread::run()
{
- pthread_mutex_init(&should_stop_mutex, NULL);
- should_stop_status = false;
+ should_stop = false;
+ int pipefd[2];
+ if (pipe2(pipefd, O_CLOEXEC) == -1) {
+ log_perror("pipe");
+ exit(1);
+ }
+ stop_fd_read = pipefd[0];
+ stop_fd_write = pipefd[1];
pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
}
void Thread::stop()
{
- {
- MutexLock lock(&should_stop_mutex);
- should_stop_status = true;
- }
- wakeup();
- if (pthread_join(worker_thread, NULL) == -1) {
- log_perror("pthread_join");
+ should_stop = true;
+ char ch = 0;
+ int err;
+ do {
+ err = write(stop_fd_write, &ch, 1);
+ } while (err == -1 && errno == EINTR);
+
+ if (err == -1) {
+ log_perror("write");
exit(1);
}
-}
-void *Thread::do_work_thunk(void *arg)
-{
- Thread *thread = reinterpret_cast<Thread *>(arg);
+ do {
+ err = close(stop_fd_write);
+ } while (err == -1 && errno == EINTR);
- // Block SIGHUP; only the main thread should see that.
- // (This isn't strictly required, but it makes it easier to debug that indeed
- // SIGUSR1 was what woke us up.)
- sigset_t set;
- sigaddset(&set, SIGHUP);
- int err = pthread_sigmask(SIG_BLOCK, &set, NULL);
- if (err != 0) {
- errno = err;
- log_perror("pthread_sigmask");
- exit(1);
+ if (err == -1) {
+ log_perror("close");
+ // Can continue (we have close-on-exec).
}
- // Block SIGUSR1, and store the old signal mask.
- sigemptyset(&set);
- sigaddset(&set, SIGUSR1);
- err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block);
- if (err != 0) {
- errno = err;
- log_perror("pthread_sigmask");
+ pthread_kill(worker_thread, SIGHUP);
+ if (pthread_join(worker_thread, NULL) == -1) {
+ log_perror("pthread_join");
exit(1);
}
+
+ do {
+ err = close(stop_fd_read);
+ } while (err == -1 && errno == EINTR);
- // Call the right thunk.
- thread->do_work();
- return NULL;
-}
-
-bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts)
-{
- pollfd pfd;
- pfd.fd = fd;
- pfd.events = events;
-
- for ( ;; ) {
- int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block);
- if (nfds == -1 && errno == EINTR) {
- return false;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
- continue;
- }
- assert(nfds <= 1);
- return (nfds == 1);
+ if (err == -1) {
+ log_perror("close");
+ // Can continue (we have close-on-exec).
}
}
-void Thread::wakeup()
+void *Thread::do_work_thunk(void *arg)
{
- pthread_kill(worker_thread, SIGUSR1);
+ Thread *thread = reinterpret_cast<Thread *>(arg);
+ thread->do_work();
+ return NULL;
}
-bool Thread::should_stop()
-{
- MutexLock lock(&should_stop_mutex);
- return should_stop_status;
-}
#ifndef _THREAD_H
#define _THREAD_H
-#include <signal.h>
#include <pthread.h>
-struct timespec;
-
-// A thread class with start/stop and signal functionality.
-//
-// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed
-// to receive it when doing wait_for_activity(), and never elsewhere. This means
-// that you can test whatever status flags you'd want before calling
-// wait_for_activity(), and then be sure that it actually returns immediately
-// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test
-// and the wait_for_activity() call.
+// A rather generic thread class with start/stop functionality.
+// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
+// since signals don't stick. We'll need to figure out something more
+// intelligent later, probably based on sending a signal to an fd.
class Thread {
public:
void stop();
protected:
- // Recovers the this pointer, blocks SIGUSR1, and calls do_work().
+ // Recovers the this pointer, and calls do_work().
static void *do_work_thunk(void *arg);
virtual void do_work() = 0;
- // Waits until there is activity of the given type on <fd> (or an error),
- // or until a wakeup. Returns true if there was actually activity on
- // the file descriptor.
- //
- // If fd is -1, wait until a wakeup or timeout.
- // if timeout_ts is NULL, there is no timeout.
- bool wait_for_activity(int fd, short events, const timespec *timeout_ts);
-
- // Wait until a wakeup.
- void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); }
-
- // Make wait_for_activity() return.
- void wakeup();
+ volatile bool should_stop;
- bool should_stop();
-
- // The signal set as it were before we blocked SIGUSR1.
- sigset_t sigset_without_usr1_block;
+ // A pipe that you can poll on if you want to see when should_stop
+ // has been set to true; stop() will write a single byte to the pipe
+ // and then close the other end.
+ int stop_fd_read;
private:
pthread_t worker_thread;
-
- // Protects should_stop_status.
- pthread_mutex_t should_stop_mutex;
-
- // If this is set, the thread should return as soon as possible from do_work().
- bool should_stop_status;
+ int stop_fd_write;
};
#endif // !defined(_THREAD_H)
#include "serverpool.h"
#include "state.pb.h"
#include "udpinput.h"
-#include "util.h"
#include "version.h"
using namespace std;
void UDPInput::close_socket()
{
- safe_close(sock);
+ int ret;
+ do {
+ ret = close(sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close()");
+ }
+
sock = -1;
}
void UDPInput::do_work()
{
- while (!should_stop()) {
+ while (!should_stop) {
if (sock == -1) {
int port_num = atoi(port.c_str());
sock = create_server_socket(port_num, UDP_SOCKET);
}
}
- // Wait for a packet, or a wakeup.
- bool activity = wait_for_activity(sock, POLLIN, NULL);
- if (!activity) {
- // Most likely, should_stop was set.
+ // Since we are non-blocking, we need to wait for the right state first.
+ // Wait up to 50 ms, then check should_stop.
+ pollfd pfd;
+ pfd.fd = sock;
+ pfd.events = POLLIN;
+
+ int nfds = poll(&pfd, 1, 50);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
continue;
}
+ if (nfds == -1) {
+ log_perror("poll");
+ close_socket();
+ continue;
+ }
char buf[4096];
int ret;
ssize_t ret = write(fd, ptr, to_write);
if (ret == -1) {
log_perror("write");
- safe_close(fd);
+ if (close(fd) == -1) {
+ log_perror("close");
+ }
return -1;
}
bool read_tempfile_and_close(int fd, std::string *contents)
{
bool ok = read_tempfile(fd, contents);
- safe_close(fd); // Implicitly deletes the file.
+
+ int ret;
+ do {
+ ret = close(fd); // Implicitly deletes the file.
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close");
+ // Can still continue.
+ }
+
return ok;
}
return true;
}
-
-int safe_close(int fd)
-{
- int ret;
- do {
- ret = close(fd);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close()");
- }
-
- return ret;
-}
// Same as read_tempfile_and_close(), without the close.
bool read_tempfile(int fd, std::string *contents);
-// Close a file descriptor, taking care of EINTR on the way.
-// log_perror() if it fails; apart from that, behaves as close().
-int safe_close(int fd);
-
#endif // !defined(_UTIL_H