]> git.sesse.net Git - cubemap/commitdiff
Rewrite the entire internal signal handling/wakeup.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 19 Apr 2013 21:39:23 +0000 (23:39 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 19 Apr 2013 21:39:23 +0000 (23:39 +0200)
We now solve things in a much less racy way, sending SIGUSR1 internally,
which is blocked except in poll/epoll. This should be more stable,
require no more fixed-length wakeups (no more wakeup every 20 ms in
each server), and upset Valgrind less.

However, it's also complex to get right, so it might introduce new bugs.

14 files changed:
acceptor.cpp
accesslog.cpp
httpinput.cpp
main.cpp
server.cpp
server.h
serverpool.cpp
stats.cpp
stream.cpp
thread.cpp
thread.h
udpinput.cpp
util.cpp
util.h

index 3172c73b80a1821e0973e08c267f6f8ccae74c10..2abfa5220064ec47efb8d40c41d3ad9ceed6a79d 100644 (file)
 #include "log.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "log.h"
 #include "serverpool.h"
 #include "state.pb.h"
+#include "util.h"
 
 using namespace std;
 
 extern ServerPool *servers;
 
 using namespace std;
 
 extern ServerPool *servers;
-extern volatile bool hupped;
 
 int create_server_socket(int port, SocketType socket_type)
 {
 
 int create_server_socket(int port, SocketType socket_type)
 {
@@ -93,32 +93,13 @@ AcceptorProto Acceptor::serialize() const
 
 void Acceptor::close_socket()
 {
 
 void Acceptor::close_socket()
 {
-       int ret;
-       do {
-               ret = close(server_sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close");
-       }
+       safe_close(server_sock);
 }
 
 void Acceptor::do_work()
 {
 }
 
 void Acceptor::do_work()
 {
-       while (!hupped) {
-               // Since we are non-blocking, we need to wait for the right state first.
-               // Wait up to 50 ms, then check hupped.
-               pollfd pfd;
-               pfd.fd = server_sock;
-               pfd.events = POLLIN;
-
-               int nfds = poll(&pfd, 1, 50);
-               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
-                       continue;
-               }
-               if (nfds == -1) {
-                       log_perror("poll");
-                       usleep(100000);
+       while (!should_stop()) {
+               if (!wait_for_activity(server_sock, POLLIN, NULL)) {
                        continue;
                }
 
                        continue;
                }
 
index 03aeec8a51fb284884410418c1c9eef901cfd725..81eff90e46b98312781364737d91d7e78c401ac4 100644 (file)
@@ -43,7 +43,7 @@ void AccessLogThread::do_work()
                }
        }
 
                }
        }
 
-       while (!should_stop) {
+       while (!should_stop()) {
                // Empty the queue.
                vector<ClientStats> writes;
                {
                // Empty the queue.
                vector<ClientStats> writes;
                {
@@ -66,26 +66,10 @@ void AccessLogThread::do_work()
                        }
                        fflush(logfp);
                }
                        }
                        fflush(logfp);
                }
-               
-               // Wait until the stop_fd pipe is closed, one second has passed.
-               // or a spurious signal arrives.
-               pollfd pfd;
-               pfd.fd = stop_fd_read;
-               pfd.events = POLLIN | POLLRDHUP;
 
 
-               int nfds = poll(&pfd, 1, 1000);
-               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
-                       continue;
-               }
-               if (nfds == 1) {
-                       // Should stop.
-                       break;
-               }
-               if (nfds == -1) {
-                       log_perror("poll");
-                       usleep(100000);
-                       continue;
-               }
+               // Wait until we are being woken up, either to quit or because
+               // there is material in pending_writes.
+               wait_for_wakeup(NULL);
        }
 
        if (logfp != NULL) {    
        }
 
        if (logfp != NULL) {    
index 49501831fa7df2805eead2a172b5dab5d77a3320..f92ac13ca9ca010f1b2a8efb2d45cba68c40b12f 100644 (file)
@@ -1,3 +1,4 @@
+#include <stdio.h>
 #include <assert.h>
 #include <errno.h>
 #include <netdb.h>
 #include <assert.h>
 #include <errno.h>
 #include <netdb.h>
@@ -20,6 +21,7 @@
 #include "parse.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "parse.h"
 #include "serverpool.h"
 #include "state.pb.h"
+#include "util.h"
 #include "version.h"
 
 using namespace std;
 #include "version.h"
 
 using namespace std;
@@ -60,14 +62,7 @@ HTTPInput::HTTPInput(const InputProto &serialized)
 
 void HTTPInput::close_socket()
 {
 
 void HTTPInput::close_socket()
 {
-       int ret;
-       do {
-               ret = close(sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close()");
-       }
+       safe_close(sock);
 }
 
 InputProto HTTPInput::serialize() const
 }
 
 InputProto HTTPInput::serialize() const
@@ -99,31 +94,64 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port)
        addrinfo *base_ai = ai;
 
        // Connect to everything in turn until we have a socket.
        addrinfo *base_ai = ai;
 
        // Connect to everything in turn until we have a socket.
-       while (ai && !should_stop) {
+       while (ai && !should_stop()) {
                int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
                if (sock == -1) {
                        // Could be e.g. EPROTONOSUPPORT. The show must go on.
                        continue;
                }
 
                int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
                if (sock == -1) {
                        // Could be e.g. EPROTONOSUPPORT. The show must go on.
                        continue;
                }
 
+               // Now do a non-blocking connect. This is important because we want to be able to be
+               // woken up, even though it's rather cumbersome.
+
+               // Set the socket as nonblocking.
+               int one = 1;
+               if (ioctl(sock, FIONBIO, &one) == -1) {
+                       log_perror("ioctl(FIONBIO)");
+                       safe_close(sock);
+                       return -1;                      
+               }
+
+               // Do a non-blocking connect.
                do {
                        err = connect(sock, ai->ai_addr, ai->ai_addrlen);
                } while (err == -1 && errno == EINTR);
 
                do {
                        err = connect(sock, ai->ai_addr, ai->ai_addrlen);
                } while (err == -1 && errno == EINTR);
 
-               if (err != -1) {
-                       freeaddrinfo(base_ai);
-                       return sock;
+               if (err == -1 && errno != EINPROGRESS) {
+                       log_perror("connect");
+                       safe_close(sock);
+                       continue;
                }
 
                }
 
-               do {
-                       err = close(sock);
-               } while (err == -1 && errno == EINTR);
+               // Wait for the connect to complete, or an error to happen.
+               for ( ;; ) {
+                       bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
+                       if (should_stop()) {
+                               safe_close(sock);
+                               return -1;
+                       }
+                       if (complete) {
+                               break;
+                       }
+               }
 
 
-               if (err == -1) {
-                       log_perror("close");
-                       // Can still continue.
+               // Check whether it ended in an error or not.
+               socklen_t err_size = sizeof(err);
+               if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
+                       log_perror("getsockopt");
+                       safe_close(sock);
+                       continue;
+               }
+
+               errno = err;
+
+               if (err == 0) {
+                       // Successful connect.
+                       freeaddrinfo(base_ai);
+                       return sock;
                }
 
                }
 
+               safe_close(sock);
                ai = ai->ai_next;
        }
 
                ai = ai->ai_next;
        }
 
@@ -221,23 +249,13 @@ bool HTTPInput::parse_response(const std::string &request)
 
 void HTTPInput::do_work()
 {
 
 void HTTPInput::do_work()
 {
-       while (!should_stop) {
+       while (!should_stop()) {
                if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
                if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
-                       // Since we are non-blocking, we need to wait for the right state first.
-                       // Wait up to 50 ms, then check should_stop.
-                       pollfd pfd;
-                       pfd.fd = sock;
-                       pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
-                       pfd.events |= POLLRDHUP;
-
-                       int nfds = poll(&pfd, 1, 50);
-                       if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+                       bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
+                       if (!activity) {
+                               // Most likely, should_stop was set.
                                continue;
                        }
                                continue;
                        }
-                       if (nfds == -1) {
-                               log_perror("poll");
-                               state = CLOSING_SOCKET;
-                       }
                }
 
                switch (state) {
                }
 
                switch (state) {
@@ -377,15 +395,7 @@ void HTTPInput::do_work()
                        break;
                }
                case CLOSING_SOCKET: {
                        break;
                }
                case CLOSING_SOCKET: {
-                       int err;
-                       do {
-                               err = close(sock);
-                       } while (err == -1 && errno == EINTR);
-
-                       if (err == -1) {
-                               log_perror("close");
-                       }
-
+                       close_socket();
                        state = NOT_CONNECTED;
                        break;
                }
                        state = NOT_CONNECTED;
                        break;
                }
@@ -396,9 +406,12 @@ void HTTPInput::do_work()
                // If we are still in NOT_CONNECTED, either something went wrong,
                // or the connection just got closed.
                // The earlier steps have already given the error message, if any.
                // If we are still in NOT_CONNECTED, either something went wrong,
                // or the connection just got closed.
                // The earlier steps have already given the error message, if any.
-               if (state == NOT_CONNECTED && !should_stop) {
+               if (state == NOT_CONNECTED && !should_stop()) {
                        log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
                        log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
-                       usleep(200000);
+                       timespec timeout_ts;
+                       timeout_ts.tv_sec = 0;
+                       timeout_ts.tv_nsec = 200000000;
+                       wait_for_wakeup(&timeout_ts);
                }
        }
 }
                }
        }
 }
index 46eddaf834ce7aacfa3c3e518d70618747731246..c89ec72ff44457352c66ebb5be638a5fbc28bc66 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -50,6 +50,10 @@ void hup(int signum)
        }
 }
 
        }
 }
 
+void do_nothing(int signum)
+{
+}
+
 CubemapStateProto collect_state(const timeval &serialize_start,
                                 const vector<Acceptor *> acceptors,
                                 const multimap<string, InputWithRefcount> inputs,
 CubemapStateProto collect_state(const timeval &serialize_start,
                                 const vector<Acceptor *> acceptors,
                                 const multimap<string, InputWithRefcount> inputs,
@@ -239,6 +243,7 @@ int main(int argc, char **argv)
 {
        signal(SIGHUP, hup);
        signal(SIGINT, hup);
 {
        signal(SIGHUP, hup);
        signal(SIGINT, hup);
+       signal(SIGUSR1, do_nothing);  // Used in internal signalling.
        signal(SIGPIPE, SIG_IGN);
        
        // Parse options.
        signal(SIGPIPE, SIG_IGN);
        
        // Parse options.
index b5f58d4c56711968db64a8d6c63e7d9eb08d0e25..a0a32f8e704a716cad99563cd75b4bc47cfeb41b 100644 (file)
@@ -26,6 +26,7 @@
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
+#include "util.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -51,14 +52,7 @@ Server::~Server()
                delete stream_it->second;
        }
 
                delete stream_it->second;
        }
 
-       int ret;
-       do {
-               ret = close(epoll_fd);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close(epoll_fd)");
-       }
+       safe_close(epoll_fd);
 }
 
 vector<ClientStats> Server::get_client_stats() const
 }
 
 vector<ClientStats> Server::get_client_stats() const
@@ -76,15 +70,11 @@ vector<ClientStats> Server::get_client_stats() const
 
 void Server::do_work()
 {
 
 void Server::do_work()
 {
-       for ( ;; ) {
-               int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
-               if (nfds == -1 && errno == EINTR) {
-                       if (should_stop) {
-                               return;
-                       }
-                       continue;
-               }
-               if (nfds == -1) {
+       while (!should_stop()) {
+               // Wait until there's activity on at least one of the fds,
+               // or we are waken up due to new queued clients or data.
+               int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block);
+               if (nfds == -1 && errno != EINTR) {
                        log_perror("epoll_wait");
                        exit(1);
                }
                        log_perror("epoll_wait");
                        exit(1);
                }
@@ -113,10 +103,6 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
                                process_client(to_process[i]);
                        }
                }
-
-               if (should_stop) {
-                       return;
-               }
        }
 }
 
        }
 }
 
@@ -143,6 +129,7 @@ void Server::add_client_deferred(int sock)
 {
        MutexLock lock(&queued_data_mutex);
        queued_add_clients.push_back(sock);
 {
        MutexLock lock(&queued_data_mutex);
        queued_add_clients.push_back(sock);
+       wakeup();
 }
 
 void Server::add_client(int sock)
 }
 
 void Server::add_client(int sock)
@@ -259,6 +246,7 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t
 {
        MutexLock lock(&queued_data_mutex);
        queued_data[stream_id].append(string(data, data + bytes));
 {
        MutexLock lock(&queued_data_mutex);
        queued_data[stream_id].append(string(data, data + bytes));
+       wakeup();
 }
 
 // See the .h file for postconditions after this function.     
 }
 
 // See the .h file for postconditions after this function.     
@@ -558,14 +546,7 @@ void Server::close_client(Client *client)
        access_log->write(client->get_stats());
 
        // Bye-bye!
        access_log->write(client->get_stats());
 
        // Bye-bye!
-       int ret;
-       do {
-               ret = close(client->sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close");
-       }
+       safe_close(client->sock);
 
        clients.erase(client->sock);
 }
 
        clients.erase(client->sock);
 }
index a3e032d88f71a2f66b9613781d64053a6f32b143..4f964873b22503c54c1fa782c0785d453d5ba345 100644 (file)
--- a/server.h
+++ b/server.h
@@ -19,7 +19,6 @@ class ClientProto;
 struct Stream;
 
 #define EPOLL_MAX_EVENTS 8192
 struct Stream;
 
 #define EPOLL_MAX_EVENTS 8192
-#define EPOLL_TIMEOUT_MS 20
 #define MAX_CLIENT_REQUEST 16384
 
 class CubemapStateProto;
 #define MAX_CLIENT_REQUEST 16384
 
 class CubemapStateProto;
index fb1668239a366b004313db3ab4b0efb2fd63ec00..e309ab99cecdae61b27faa77c55b5184ba473f4a 100644 (file)
@@ -92,15 +92,7 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vec
 
        // Close and delete any leftovers, if the number of servers was reduced.
        for (size_t i = num_servers; i < data_fds.size(); ++i) {
 
        // Close and delete any leftovers, if the number of servers was reduced.
        for (size_t i = num_servers; i < data_fds.size(); ++i) {
-               int ret;
-               do {
-                       ret = close(data_fds[i]);  // Implicitly deletes the file.
-               } while (ret == -1 && errno == EINTR);
-
-               if (ret == -1) {
-                       log_perror("close");
-                       // Can still continue.
-               }
+               safe_close(data_fds[i]);  // Implicitly deletes the file.
        }
 }
 
        }
 }
 
index b2bca75970f800a6cf3f21517e243e3aed89cacd..280f019edd9b2811d551cf018e62334cf00413bc 100644 (file)
--- a/stats.cpp
+++ b/stats.cpp
@@ -13,6 +13,7 @@
 #include "log.h"
 #include "serverpool.h"
 #include "stats.h"
 #include "log.h"
 #include "serverpool.h"
 #include "stats.h"
+#include "util.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -26,7 +27,7 @@ StatsThread::StatsThread(const std::string &stats_file, int stats_interval)
 
 void StatsThread::do_work()
 {
 
 void StatsThread::do_work()
 {
-       while (!should_stop) {
+       while (!should_stop()) {
                int fd;
                FILE *fp;
                time_t now;
                int fd;
                FILE *fp;
                time_t now;
@@ -44,9 +45,7 @@ void StatsThread::do_work()
                fp = fdopen(fd, "w");
                if (fp == NULL) {
                        log_perror("fdopen");
                fp = fdopen(fd, "w");
                if (fp == NULL) {
                        log_perror("fdopen");
-                       if (close(fd) == -1) {
-                               log_perror("close");
-                       }
+                       safe_close(fd);
                        if (unlink(filename) == -1) {
                                log_perror(filename);
                        }
                        if (unlink(filename) == -1) {
                                log_perror(filename);
                        }
@@ -85,25 +84,12 @@ void StatsThread::do_work()
                free(filename);
 
 sleep:
                free(filename);
 
 sleep:
-               // Wait until the stop_fd pipe is closed, stats_interval timeout,
+               // Wait until we are asked to quit, stats_interval timeout,
                // or a spurious signal. (The latter will cause us to write stats
                // too often, but that's okay.)
                // or a spurious signal. (The latter will cause us to write stats
                // too often, but that's okay.)
-               pollfd pfd;
-               pfd.fd = stop_fd_read;
-               pfd.events = POLLIN | POLLRDHUP;
-
-               int nfds = poll(&pfd, 1, stats_interval * 1000);
-               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
-                       continue;
-               }
-               if (nfds == 1) {
-                       // Should stop.
-                       break;
-               }
-               if (nfds == -1) {
-                       log_perror("poll");
-                       usleep(100000);
-                       continue;
-               }
+               timespec timeout_ts;
+               timeout_ts.tv_sec = stats_interval;
+               timeout_ts.tv_nsec = 0;
+               wait_for_wakeup(&timeout_ts);
        }
 }
        }
 }
index 83257e9046d3a9cbbd29756f100bedaed069df45..ab08a0f7ad85f34d458e9c41b40bbf5d7ccfe721 100644 (file)
@@ -31,13 +31,7 @@ Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
 Stream::~Stream()
 {
        if (data_fd != -1) {
 Stream::~Stream()
 {
        if (data_fd != -1) {
-               int ret;
-               do {
-                       ret = close(data_fd);
-               } while (ret == -1 && errno == EINTR);
-               if (ret == -1) {
-                       log_perror("close");
-               }
+               safe_close(data_fd);
        }
 }
 
        }
 }
 
index f719eacb95c5379a2d664f23c49c04e0b50b767b..bd4cfd1790288e6dcb712820b3bc6725b9548732 100644 (file)
@@ -1,71 +1,97 @@
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <errno.h>
 
 #include "log.h"
 
 #include "log.h"
+#include "mutexlock.h"
 #include "thread.h"
 #include "thread.h"
-       
+
 Thread::~Thread() {}
 
 void Thread::run()
 {
 Thread::~Thread() {}
 
 void Thread::run()
 {
-       should_stop = false;
-       int pipefd[2];
-       if (pipe2(pipefd, O_CLOEXEC) == -1) {
-               log_perror("pipe");
-               exit(1);
-       }
-       stop_fd_read = pipefd[0];
-       stop_fd_write = pipefd[1];
+       pthread_mutex_init(&should_stop_mutex, NULL);
+       should_stop_status = false;
        pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
 }
 
 void Thread::stop()
 {
        pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
 }
 
 void Thread::stop()
 {
-       should_stop = true;
-       char ch = 0;
-       int err;
-       do {
-               err = write(stop_fd_write, &ch, 1);
-       } while (err == -1 && errno == EINTR);
-
-       if (err == -1) {
-               log_perror("write");
+       {
+               MutexLock lock(&should_stop_mutex);
+               should_stop_status = true;
+       }
+       wakeup();
+       if (pthread_join(worker_thread, NULL) == -1) {
+               log_perror("pthread_join");
                exit(1);
        }
                exit(1);
        }
+}
 
 
-       do {
-               err = close(stop_fd_write);
-       } while (err == -1 && errno == EINTR);
+void *Thread::do_work_thunk(void *arg)
+{
+       Thread *thread = reinterpret_cast<Thread *>(arg);
 
 
-       if (err == -1) {
-               log_perror("close");
-               // Can continue (we have close-on-exec).
+       // Block SIGHUP; only the main thread should see that.
+       // (This isn't strictly required, but it makes it easier to debug that indeed
+       // SIGUSR1 was what woke us up.)
+       sigset_t set;
+       sigaddset(&set, SIGHUP);
+       int err = pthread_sigmask(SIG_BLOCK, &set, NULL);
+       if (err != 0) {
+               errno = err;
+               log_perror("pthread_sigmask");
+               exit(1);
        }
 
        }
 
-       pthread_kill(worker_thread, SIGHUP);
-       if (pthread_join(worker_thread, NULL) == -1) {
-               log_perror("pthread_join");
+       // Block SIGUSR1, and store the old signal mask.
+       sigemptyset(&set);
+       sigaddset(&set, SIGUSR1);
+       err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block);
+       if (err != 0) {
+               errno = err;
+               log_perror("pthread_sigmask");
                exit(1);
        }
                exit(1);
        }
-       
-       do {
-               err = close(stop_fd_read);
-       } while (err == -1 && errno == EINTR);
 
 
-       if (err == -1) {
-               log_perror("close");
-               // Can continue (we have close-on-exec).
+       // Call the right thunk.
+       thread->do_work();
+       return NULL;
+}
+
+bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts)
+{
+       pollfd pfd;
+       pfd.fd = fd;
+       pfd.events = events;
+
+       for ( ;; ) {
+               int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block);
+               if (nfds == -1 && errno == EINTR) {
+                       return false;
+               }
+               if (nfds == -1) {
+                       log_perror("poll");
+                       usleep(100000);
+                       continue;
+               }
+               assert(nfds <= 1);
+               return (nfds == 1);
        }
 }
 
        }
 }
 
-void *Thread::do_work_thunk(void *arg)
+void Thread::wakeup()
 {
 {
-       Thread *thread = reinterpret_cast<Thread *>(arg);
-       thread->do_work();
-       return NULL;
+       pthread_kill(worker_thread, SIGUSR1);
 }
 
 }
 
+bool Thread::should_stop()
+{
+       MutexLock lock(&should_stop_mutex);
+       return should_stop_status;
+}
index 26e648e5332d6a3016d0ec59d4a58b58b09033a8..12aa133d82f77781e6de0b3273e8dc0128232d76 100644 (file)
--- a/thread.h
+++ b/thread.h
@@ -1,12 +1,19 @@
 #ifndef _THREAD_H
 #define _THREAD_H
 
 #ifndef _THREAD_H
 #define _THREAD_H
 
+#include <signal.h>
 #include <pthread.h>
 
 #include <pthread.h>
 
-// A rather generic thread class with start/stop functionality.
-// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
-// since signals don't stick. We'll need to figure out something more
-// intelligent later, probably based on sending a signal to an fd.
+struct timespec;
+
+// A thread class with start/stop and signal functionality.
+//
+// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed
+// to receive it when doing wait_for_activity(), and never elsewhere. This means
+// that you can test whatever status flags you'd want before calling
+// wait_for_activity(), and then be sure that it actually returns immediately
+// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test
+// and the wait_for_activity() call.
 
 class Thread {
 public:
 
 class Thread {
 public:
@@ -15,21 +22,38 @@ public:
        void stop();
 
 protected:
        void stop();
 
 protected:
-       // Recovers the this pointer, and calls do_work().
+       // Recovers the this pointer, blocks SIGUSR1, and calls do_work().
        static void *do_work_thunk(void *arg);
 
        virtual void do_work() = 0;
 
        static void *do_work_thunk(void *arg);
 
        virtual void do_work() = 0;
 
-       volatile bool should_stop;
+       // Waits until there is activity of the given type on <fd> (or an error),
+       // or until a wakeup. Returns true if there was actually activity on
+       // the file descriptor.
+       //
+       // If fd is -1, wait until a wakeup or timeout.
+       // if timeout_ts is NULL, there is no timeout.
+       bool wait_for_activity(int fd, short events, const timespec *timeout_ts);
+
+       // Wait until a wakeup.
+       void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); }
+
+       // Make wait_for_activity() return.
+       void wakeup();
 
 
-       // A pipe that you can poll on if you want to see when should_stop
-       // has been set to true; stop() will write a single byte to the pipe
-       // and then close the other end.
-       int stop_fd_read;
+       bool should_stop();
+
+       // The signal set as it were before we blocked SIGUSR1.
+       sigset_t sigset_without_usr1_block;
 
 private:
        pthread_t worker_thread;
 
 private:
        pthread_t worker_thread;
-       int stop_fd_write;
+
+       // Protects should_stop_status.
+       pthread_mutex_t should_stop_mutex;
+
+       // If this is set, the thread should return as soon as possible from do_work().
+       bool should_stop_status;
 };
 
 #endif  // !defined(_THREAD_H)
 };
 
 #endif  // !defined(_THREAD_H)
index 4222515e71c58366cc372c9faf33ea8a5b6788f4..07bf63d344e8ba6e38d7d8d399acafad3016058c 100644 (file)
@@ -12,6 +12,7 @@
 #include "serverpool.h"
 #include "state.pb.h"
 #include "udpinput.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "udpinput.h"
+#include "util.h"
 #include "version.h"
 
 using namespace std;
 #include "version.h"
 
 using namespace std;
@@ -52,15 +53,7 @@ InputProto UDPInput::serialize() const
 
 void UDPInput::close_socket()
 {
 
 void UDPInput::close_socket()
 {
-       int ret;
-       do {
-               ret = close(sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close()");
-       }
-
+       safe_close(sock);
        sock = -1;
 }
        
        sock = -1;
 }
        
@@ -82,7 +75,7 @@ void UDPInput::add_destination(const string &stream_id)
 
 void UDPInput::do_work()
 {
 
 void UDPInput::do_work()
 {
-       while (!should_stop) {
+       while (!should_stop()) {
                if (sock == -1) {
                        int port_num = atoi(port.c_str());
                        sock = create_server_socket(port_num, UDP_SOCKET);
                if (sock == -1) {
                        int port_num = atoi(port.c_str());
                        sock = create_server_socket(port_num, UDP_SOCKET);
@@ -94,21 +87,12 @@ void UDPInput::do_work()
                        }
                }
 
                        }
                }
 
-               // Since we are non-blocking, we need to wait for the right state first.
-               // Wait up to 50 ms, then check should_stop.
-               pollfd pfd;
-               pfd.fd = sock;
-               pfd.events = POLLIN;
-
-               int nfds = poll(&pfd, 1, 50);
-               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+               // Wait for a packet, or a wakeup.
+               bool activity = wait_for_activity(sock, POLLIN, NULL);
+               if (!activity) {
+                       // Most likely, should_stop was set.
                        continue;
                }
                        continue;
                }
-               if (nfds == -1) {
-                       log_perror("poll");
-                       close_socket();
-                       continue;       
-               }
 
                char buf[4096];
                int ret;
 
                char buf[4096];
                int ret;
index d6d39f29ada05dc4437886d61178d190df189d02..2bbffdc13c1aeafe0224014060b23043eca47fb1 100644 (file)
--- a/util.cpp
+++ b/util.cpp
@@ -33,9 +33,7 @@ int make_tempfile(const std::string &contents)
                ssize_t ret = write(fd, ptr, to_write);
                if (ret == -1) {
                        log_perror("write");
                ssize_t ret = write(fd, ptr, to_write);
                if (ret == -1) {
                        log_perror("write");
-                       if (close(fd) == -1) {
-                               log_perror("close");
-                       }
+                       safe_close(fd);
                        return -1;
                }
 
                        return -1;
                }
 
@@ -49,17 +47,7 @@ int make_tempfile(const std::string &contents)
 bool read_tempfile_and_close(int fd, std::string *contents)
 {
        bool ok = read_tempfile(fd, contents);
 bool read_tempfile_and_close(int fd, std::string *contents)
 {
        bool ok = read_tempfile(fd, contents);
-
-       int ret;
-       do {
-               ret = close(fd);  // Implicitly deletes the file.
-       } while (ret == -1 && errno == EINTR);
-       
-       if (ret == -1) {
-               log_perror("close");
-               // Can still continue.
-       }
-
+       safe_close(fd);  // Implicitly deletes the file.
        return ok;
 }
 
        return ok;
 }
 
@@ -96,3 +84,17 @@ bool read_tempfile(int fd, std::string *contents)
 
        return true;
 }
 
        return true;
 }
+
+int safe_close(int fd)
+{
+       int ret;
+       do {
+               ret = close(fd);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               log_perror("close()");
+       }
+
+       return ret;
+}
diff --git a/util.h b/util.h
index 303de14d0098e26cbd8f9c535cd6e2438f92021e..d1670750d522f36ec7032736530516b10d59b894 100644 (file)
--- a/util.h
+++ b/util.h
@@ -15,4 +15,8 @@ bool read_tempfile_and_close(int fd, std::string *contents);
 // Same as read_tempfile_and_close(), without the close.
 bool read_tempfile(int fd, std::string *contents);
 
 // Same as read_tempfile_and_close(), without the close.
 bool read_tempfile(int fd, std::string *contents);
 
+// Close a file descriptor, taking care of EINTR on the way.
+// log_perror() if it fails; apart from that, behaves as close().
+int safe_close(int fd);
+
 #endif  // !defined(_UTIL_H
 #endif  // !defined(_UTIL_H