]> git.sesse.net Git - cubemap/commitdiff
Revert "Rewrite the entire internal signal handling/wakeup."
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 19 Apr 2013 23:21:15 +0000 (01:21 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 19 Apr 2013 23:21:15 +0000 (01:21 +0200)
Seemingly this had very bad effects on CPU usage. Will need to
investigate later.

This reverts commit 3fd8650ccf3da3960a946d8ac9abc305aec399ce.

14 files changed:
acceptor.cpp
accesslog.cpp
httpinput.cpp
main.cpp
server.cpp
server.h
serverpool.cpp
stats.cpp
stream.cpp
thread.cpp
thread.h
udpinput.cpp
util.cpp
util.h

index 2abfa5220064ec47efb8d40c41d3ad9ceed6a79d..3172c73b80a1821e0973e08c267f6f8ccae74c10 100644 (file)
 #include "log.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "log.h"
 #include "serverpool.h"
 #include "state.pb.h"
-#include "util.h"
 
 using namespace std;
 
 extern ServerPool *servers;
 
 using namespace std;
 
 extern ServerPool *servers;
+extern volatile bool hupped;
 
 int create_server_socket(int port, SocketType socket_type)
 {
 
 int create_server_socket(int port, SocketType socket_type)
 {
@@ -93,13 +93,32 @@ AcceptorProto Acceptor::serialize() const
 
 void Acceptor::close_socket()
 {
 
 void Acceptor::close_socket()
 {
-       safe_close(server_sock);
+       int ret;
+       do {
+               ret = close(server_sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               log_perror("close");
+       }
 }
 
 void Acceptor::do_work()
 {
 }
 
 void Acceptor::do_work()
 {
-       while (!should_stop()) {
-               if (!wait_for_activity(server_sock, POLLIN, NULL)) {
+       while (!hupped) {
+               // Since we are non-blocking, we need to wait for the right state first.
+               // Wait up to 50 ms, then check hupped.
+               pollfd pfd;
+               pfd.fd = server_sock;
+               pfd.events = POLLIN;
+
+               int nfds = poll(&pfd, 1, 50);
+               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+                       continue;
+               }
+               if (nfds == -1) {
+                       log_perror("poll");
+                       usleep(100000);
                        continue;
                }
 
                        continue;
                }
 
index 81eff90e46b98312781364737d91d7e78c401ac4..03aeec8a51fb284884410418c1c9eef901cfd725 100644 (file)
@@ -43,7 +43,7 @@ void AccessLogThread::do_work()
                }
        }
 
                }
        }
 
-       while (!should_stop()) {
+       while (!should_stop) {
                // Empty the queue.
                vector<ClientStats> writes;
                {
                // Empty the queue.
                vector<ClientStats> writes;
                {
@@ -66,10 +66,26 @@ void AccessLogThread::do_work()
                        }
                        fflush(logfp);
                }
                        }
                        fflush(logfp);
                }
+               
+               // Wait until the stop_fd pipe is closed, one second has passed.
+               // or a spurious signal arrives.
+               pollfd pfd;
+               pfd.fd = stop_fd_read;
+               pfd.events = POLLIN | POLLRDHUP;
 
 
-               // Wait until we are being woken up, either to quit or because
-               // there is material in pending_writes.
-               wait_for_wakeup(NULL);
+               int nfds = poll(&pfd, 1, 1000);
+               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+                       continue;
+               }
+               if (nfds == 1) {
+                       // Should stop.
+                       break;
+               }
+               if (nfds == -1) {
+                       log_perror("poll");
+                       usleep(100000);
+                       continue;
+               }
        }
 
        if (logfp != NULL) {    
        }
 
        if (logfp != NULL) {    
index f92ac13ca9ca010f1b2a8efb2d45cba68c40b12f..49501831fa7df2805eead2a172b5dab5d77a3320 100644 (file)
@@ -1,4 +1,3 @@
-#include <stdio.h>
 #include <assert.h>
 #include <errno.h>
 #include <netdb.h>
 #include <assert.h>
 #include <errno.h>
 #include <netdb.h>
@@ -21,7 +20,6 @@
 #include "parse.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "parse.h"
 #include "serverpool.h"
 #include "state.pb.h"
-#include "util.h"
 #include "version.h"
 
 using namespace std;
 #include "version.h"
 
 using namespace std;
@@ -62,7 +60,14 @@ HTTPInput::HTTPInput(const InputProto &serialized)
 
 void HTTPInput::close_socket()
 {
 
 void HTTPInput::close_socket()
 {
-       safe_close(sock);
+       int ret;
+       do {
+               ret = close(sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               log_perror("close()");
+       }
 }
 
 InputProto HTTPInput::serialize() const
 }
 
 InputProto HTTPInput::serialize() const
@@ -94,64 +99,31 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port)
        addrinfo *base_ai = ai;
 
        // Connect to everything in turn until we have a socket.
        addrinfo *base_ai = ai;
 
        // Connect to everything in turn until we have a socket.
-       while (ai && !should_stop()) {
+       while (ai && !should_stop) {
                int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
                if (sock == -1) {
                        // Could be e.g. EPROTONOSUPPORT. The show must go on.
                        continue;
                }
 
                int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
                if (sock == -1) {
                        // Could be e.g. EPROTONOSUPPORT. The show must go on.
                        continue;
                }
 
-               // Now do a non-blocking connect. This is important because we want to be able to be
-               // woken up, even though it's rather cumbersome.
-
-               // Set the socket as nonblocking.
-               int one = 1;
-               if (ioctl(sock, FIONBIO, &one) == -1) {
-                       log_perror("ioctl(FIONBIO)");
-                       safe_close(sock);
-                       return -1;                      
-               }
-
-               // Do a non-blocking connect.
                do {
                        err = connect(sock, ai->ai_addr, ai->ai_addrlen);
                } while (err == -1 && errno == EINTR);
 
                do {
                        err = connect(sock, ai->ai_addr, ai->ai_addrlen);
                } while (err == -1 && errno == EINTR);
 
-               if (err == -1 && errno != EINPROGRESS) {
-                       log_perror("connect");
-                       safe_close(sock);
-                       continue;
-               }
-
-               // Wait for the connect to complete, or an error to happen.
-               for ( ;; ) {
-                       bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
-                       if (should_stop()) {
-                               safe_close(sock);
-                               return -1;
-                       }
-                       if (complete) {
-                               break;
-                       }
-               }
-
-               // Check whether it ended in an error or not.
-               socklen_t err_size = sizeof(err);
-               if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
-                       log_perror("getsockopt");
-                       safe_close(sock);
-                       continue;
+               if (err != -1) {
+                       freeaddrinfo(base_ai);
+                       return sock;
                }
 
                }
 
-               errno = err;
+               do {
+                       err = close(sock);
+               } while (err == -1 && errno == EINTR);
 
 
-               if (err == 0) {
-                       // Successful connect.
-                       freeaddrinfo(base_ai);
-                       return sock;
+               if (err == -1) {
+                       log_perror("close");
+                       // Can still continue.
                }
 
                }
 
-               safe_close(sock);
                ai = ai->ai_next;
        }
 
                ai = ai->ai_next;
        }
 
@@ -249,13 +221,23 @@ bool HTTPInput::parse_response(const std::string &request)
 
 void HTTPInput::do_work()
 {
 
 void HTTPInput::do_work()
 {
-       while (!should_stop()) {
+       while (!should_stop) {
                if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
                if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
-                       bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
-                       if (!activity) {
-                               // Most likely, should_stop was set.
+                       // Since we are non-blocking, we need to wait for the right state first.
+                       // Wait up to 50 ms, then check should_stop.
+                       pollfd pfd;
+                       pfd.fd = sock;
+                       pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
+                       pfd.events |= POLLRDHUP;
+
+                       int nfds = poll(&pfd, 1, 50);
+                       if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
                                continue;
                        }
                                continue;
                        }
+                       if (nfds == -1) {
+                               log_perror("poll");
+                               state = CLOSING_SOCKET;
+                       }
                }
 
                switch (state) {
                }
 
                switch (state) {
@@ -395,7 +377,15 @@ void HTTPInput::do_work()
                        break;
                }
                case CLOSING_SOCKET: {
                        break;
                }
                case CLOSING_SOCKET: {
-                       close_socket();
+                       int err;
+                       do {
+                               err = close(sock);
+                       } while (err == -1 && errno == EINTR);
+
+                       if (err == -1) {
+                               log_perror("close");
+                       }
+
                        state = NOT_CONNECTED;
                        break;
                }
                        state = NOT_CONNECTED;
                        break;
                }
@@ -406,12 +396,9 @@ void HTTPInput::do_work()
                // If we are still in NOT_CONNECTED, either something went wrong,
                // or the connection just got closed.
                // The earlier steps have already given the error message, if any.
                // If we are still in NOT_CONNECTED, either something went wrong,
                // or the connection just got closed.
                // The earlier steps have already given the error message, if any.
-               if (state == NOT_CONNECTED && !should_stop()) {
+               if (state == NOT_CONNECTED && !should_stop) {
                        log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
                        log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
-                       timespec timeout_ts;
-                       timeout_ts.tv_sec = 0;
-                       timeout_ts.tv_nsec = 200000000;
-                       wait_for_wakeup(&timeout_ts);
+                       usleep(200000);
                }
        }
 }
                }
        }
 }
index c89ec72ff44457352c66ebb5be638a5fbc28bc66..46eddaf834ce7aacfa3c3e518d70618747731246 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -50,10 +50,6 @@ void hup(int signum)
        }
 }
 
        }
 }
 
-void do_nothing(int signum)
-{
-}
-
 CubemapStateProto collect_state(const timeval &serialize_start,
                                 const vector<Acceptor *> acceptors,
                                 const multimap<string, InputWithRefcount> inputs,
 CubemapStateProto collect_state(const timeval &serialize_start,
                                 const vector<Acceptor *> acceptors,
                                 const multimap<string, InputWithRefcount> inputs,
@@ -243,7 +239,6 @@ int main(int argc, char **argv)
 {
        signal(SIGHUP, hup);
        signal(SIGINT, hup);
 {
        signal(SIGHUP, hup);
        signal(SIGINT, hup);
-       signal(SIGUSR1, do_nothing);  // Used in internal signalling.
        signal(SIGPIPE, SIG_IGN);
        
        // Parse options.
        signal(SIGPIPE, SIG_IGN);
        
        // Parse options.
index a0a32f8e704a716cad99563cd75b4bc47cfeb41b..b5f58d4c56711968db64a8d6c63e7d9eb08d0e25 100644 (file)
@@ -26,7 +26,6 @@
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
-#include "util.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -52,7 +51,14 @@ Server::~Server()
                delete stream_it->second;
        }
 
                delete stream_it->second;
        }
 
-       safe_close(epoll_fd);
+       int ret;
+       do {
+               ret = close(epoll_fd);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               log_perror("close(epoll_fd)");
+       }
 }
 
 vector<ClientStats> Server::get_client_stats() const
 }
 
 vector<ClientStats> Server::get_client_stats() const
@@ -70,11 +76,15 @@ vector<ClientStats> Server::get_client_stats() const
 
 void Server::do_work()
 {
 
 void Server::do_work()
 {
-       while (!should_stop()) {
-               // Wait until there's activity on at least one of the fds,
-               // or we are waken up due to new queued clients or data.
-               int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block);
-               if (nfds == -1 && errno != EINTR) {
+       for ( ;; ) {
+               int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
+               if (nfds == -1 && errno == EINTR) {
+                       if (should_stop) {
+                               return;
+                       }
+                       continue;
+               }
+               if (nfds == -1) {
                        log_perror("epoll_wait");
                        exit(1);
                }
                        log_perror("epoll_wait");
                        exit(1);
                }
@@ -103,6 +113,10 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
                                process_client(to_process[i]);
                        }
                }
+
+               if (should_stop) {
+                       return;
+               }
        }
 }
 
        }
 }
 
@@ -129,7 +143,6 @@ void Server::add_client_deferred(int sock)
 {
        MutexLock lock(&queued_data_mutex);
        queued_add_clients.push_back(sock);
 {
        MutexLock lock(&queued_data_mutex);
        queued_add_clients.push_back(sock);
-       wakeup();
 }
 
 void Server::add_client(int sock)
 }
 
 void Server::add_client(int sock)
@@ -246,7 +259,6 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t
 {
        MutexLock lock(&queued_data_mutex);
        queued_data[stream_id].append(string(data, data + bytes));
 {
        MutexLock lock(&queued_data_mutex);
        queued_data[stream_id].append(string(data, data + bytes));
-       wakeup();
 }
 
 // See the .h file for postconditions after this function.     
 }
 
 // See the .h file for postconditions after this function.     
@@ -546,7 +558,14 @@ void Server::close_client(Client *client)
        access_log->write(client->get_stats());
 
        // Bye-bye!
        access_log->write(client->get_stats());
 
        // Bye-bye!
-       safe_close(client->sock);
+       int ret;
+       do {
+               ret = close(client->sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               log_perror("close");
+       }
 
        clients.erase(client->sock);
 }
 
        clients.erase(client->sock);
 }
index 4f964873b22503c54c1fa782c0785d453d5ba345..a3e032d88f71a2f66b9613781d64053a6f32b143 100644 (file)
--- a/server.h
+++ b/server.h
@@ -19,6 +19,7 @@ class ClientProto;
 struct Stream;
 
 #define EPOLL_MAX_EVENTS 8192
 struct Stream;
 
 #define EPOLL_MAX_EVENTS 8192
+#define EPOLL_TIMEOUT_MS 20
 #define MAX_CLIENT_REQUEST 16384
 
 class CubemapStateProto;
 #define MAX_CLIENT_REQUEST 16384
 
 class CubemapStateProto;
index e309ab99cecdae61b27faa77c55b5184ba473f4a..fb1668239a366b004313db3ab4b0efb2fd63ec00 100644 (file)
@@ -92,7 +92,15 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vec
 
        // Close and delete any leftovers, if the number of servers was reduced.
        for (size_t i = num_servers; i < data_fds.size(); ++i) {
 
        // Close and delete any leftovers, if the number of servers was reduced.
        for (size_t i = num_servers; i < data_fds.size(); ++i) {
-               safe_close(data_fds[i]);  // Implicitly deletes the file.
+               int ret;
+               do {
+                       ret = close(data_fds[i]);  // Implicitly deletes the file.
+               } while (ret == -1 && errno == EINTR);
+
+               if (ret == -1) {
+                       log_perror("close");
+                       // Can still continue.
+               }
        }
 }
 
        }
 }
 
index 280f019edd9b2811d551cf018e62334cf00413bc..b2bca75970f800a6cf3f21517e243e3aed89cacd 100644 (file)
--- a/stats.cpp
+++ b/stats.cpp
@@ -13,7 +13,6 @@
 #include "log.h"
 #include "serverpool.h"
 #include "stats.h"
 #include "log.h"
 #include "serverpool.h"
 #include "stats.h"
-#include "util.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -27,7 +26,7 @@ StatsThread::StatsThread(const std::string &stats_file, int stats_interval)
 
 void StatsThread::do_work()
 {
 
 void StatsThread::do_work()
 {
-       while (!should_stop()) {
+       while (!should_stop) {
                int fd;
                FILE *fp;
                time_t now;
                int fd;
                FILE *fp;
                time_t now;
@@ -45,7 +44,9 @@ void StatsThread::do_work()
                fp = fdopen(fd, "w");
                if (fp == NULL) {
                        log_perror("fdopen");
                fp = fdopen(fd, "w");
                if (fp == NULL) {
                        log_perror("fdopen");
-                       safe_close(fd);
+                       if (close(fd) == -1) {
+                               log_perror("close");
+                       }
                        if (unlink(filename) == -1) {
                                log_perror(filename);
                        }
                        if (unlink(filename) == -1) {
                                log_perror(filename);
                        }
@@ -84,12 +85,25 @@ void StatsThread::do_work()
                free(filename);
 
 sleep:
                free(filename);
 
 sleep:
-               // Wait until we are asked to quit, stats_interval timeout,
+               // Wait until the stop_fd pipe is closed, stats_interval timeout,
                // or a spurious signal. (The latter will cause us to write stats
                // too often, but that's okay.)
                // or a spurious signal. (The latter will cause us to write stats
                // too often, but that's okay.)
-               timespec timeout_ts;
-               timeout_ts.tv_sec = stats_interval;
-               timeout_ts.tv_nsec = 0;
-               wait_for_wakeup(&timeout_ts);
+               pollfd pfd;
+               pfd.fd = stop_fd_read;
+               pfd.events = POLLIN | POLLRDHUP;
+
+               int nfds = poll(&pfd, 1, stats_interval * 1000);
+               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+                       continue;
+               }
+               if (nfds == 1) {
+                       // Should stop.
+                       break;
+               }
+               if (nfds == -1) {
+                       log_perror("poll");
+                       usleep(100000);
+                       continue;
+               }
        }
 }
        }
 }
index ab08a0f7ad85f34d458e9c41b40bbf5d7ccfe721..83257e9046d3a9cbbd29756f100bedaed069df45 100644 (file)
@@ -31,7 +31,13 @@ Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
 Stream::~Stream()
 {
        if (data_fd != -1) {
 Stream::~Stream()
 {
        if (data_fd != -1) {
-               safe_close(data_fd);
+               int ret;
+               do {
+                       ret = close(data_fd);
+               } while (ret == -1 && errno == EINTR);
+               if (ret == -1) {
+                       log_perror("close");
+               }
        }
 }
 
        }
 }
 
index bd4cfd1790288e6dcb712820b3bc6725b9548732..f719eacb95c5379a2d664f23c49c04e0b50b767b 100644 (file)
@@ -1,97 +1,71 @@
-#include <assert.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <poll.h>
-#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <errno.h>
 
 #include "log.h"
 
 #include "log.h"
-#include "mutexlock.h"
 #include "thread.h"
 #include "thread.h"
-
+       
 Thread::~Thread() {}
 
 void Thread::run()
 {
 Thread::~Thread() {}
 
 void Thread::run()
 {
-       pthread_mutex_init(&should_stop_mutex, NULL);
-       should_stop_status = false;
+       should_stop = false;
+       int pipefd[2];
+       if (pipe2(pipefd, O_CLOEXEC) == -1) {
+               log_perror("pipe");
+               exit(1);
+       }
+       stop_fd_read = pipefd[0];
+       stop_fd_write = pipefd[1];
        pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
 }
 
 void Thread::stop()
 {
        pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
 }
 
 void Thread::stop()
 {
-       {
-               MutexLock lock(&should_stop_mutex);
-               should_stop_status = true;
-       }
-       wakeup();
-       if (pthread_join(worker_thread, NULL) == -1) {
-               log_perror("pthread_join");
+       should_stop = true;
+       char ch = 0;
+       int err;
+       do {
+               err = write(stop_fd_write, &ch, 1);
+       } while (err == -1 && errno == EINTR);
+
+       if (err == -1) {
+               log_perror("write");
                exit(1);
        }
                exit(1);
        }
-}
 
 
-void *Thread::do_work_thunk(void *arg)
-{
-       Thread *thread = reinterpret_cast<Thread *>(arg);
+       do {
+               err = close(stop_fd_write);
+       } while (err == -1 && errno == EINTR);
 
 
-       // Block SIGHUP; only the main thread should see that.
-       // (This isn't strictly required, but it makes it easier to debug that indeed
-       // SIGUSR1 was what woke us up.)
-       sigset_t set;
-       sigaddset(&set, SIGHUP);
-       int err = pthread_sigmask(SIG_BLOCK, &set, NULL);
-       if (err != 0) {
-               errno = err;
-               log_perror("pthread_sigmask");
-               exit(1);
+       if (err == -1) {
+               log_perror("close");
+               // Can continue (we have close-on-exec).
        }
 
        }
 
-       // Block SIGUSR1, and store the old signal mask.
-       sigemptyset(&set);
-       sigaddset(&set, SIGUSR1);
-       err = pthread_sigmask(SIG_BLOCK, &set, &thread->sigset_without_usr1_block);
-       if (err != 0) {
-               errno = err;
-               log_perror("pthread_sigmask");
+       pthread_kill(worker_thread, SIGHUP);
+       if (pthread_join(worker_thread, NULL) == -1) {
+               log_perror("pthread_join");
                exit(1);
        }
                exit(1);
        }
+       
+       do {
+               err = close(stop_fd_read);
+       } while (err == -1 && errno == EINTR);
 
 
-       // Call the right thunk.
-       thread->do_work();
-       return NULL;
-}
-
-bool Thread::wait_for_activity(int fd, short events, const struct timespec *timeout_ts)
-{
-       pollfd pfd;
-       pfd.fd = fd;
-       pfd.events = events;
-
-       for ( ;; ) {
-               int nfds = ppoll(&pfd, (fd == -1) ? 0 : 1, timeout_ts, &sigset_without_usr1_block);
-               if (nfds == -1 && errno == EINTR) {
-                       return false;
-               }
-               if (nfds == -1) {
-                       log_perror("poll");
-                       usleep(100000);
-                       continue;
-               }
-               assert(nfds <= 1);
-               return (nfds == 1);
+       if (err == -1) {
+               log_perror("close");
+               // Can continue (we have close-on-exec).
        }
 }
 
        }
 }
 
-void Thread::wakeup()
+void *Thread::do_work_thunk(void *arg)
 {
 {
-       pthread_kill(worker_thread, SIGUSR1);
+       Thread *thread = reinterpret_cast<Thread *>(arg);
+       thread->do_work();
+       return NULL;
 }
 
 }
 
-bool Thread::should_stop()
-{
-       MutexLock lock(&should_stop_mutex);
-       return should_stop_status;
-}
index 12aa133d82f77781e6de0b3273e8dc0128232d76..26e648e5332d6a3016d0ec59d4a58b58b09033a8 100644 (file)
--- a/thread.h
+++ b/thread.h
@@ -1,19 +1,12 @@
 #ifndef _THREAD_H
 #define _THREAD_H
 
 #ifndef _THREAD_H
 #define _THREAD_H
 
-#include <signal.h>
 #include <pthread.h>
 
 #include <pthread.h>
 
-struct timespec;
-
-// A thread class with start/stop and signal functionality.
-//
-// SIGUSR1 is blocked during execution of do_work(), so that you are guaranteed
-// to receive it when doing wait_for_activity(), and never elsewhere. This means
-// that you can test whatever status flags you'd want before calling
-// wait_for_activity(), and then be sure that it actually returns immediately
-// if a SIGUSR1 (ie., wakeup()) happened, even if it were sent between your test
-// and the wait_for_activity() call.
+// A rather generic thread class with start/stop functionality.
+// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
+// since signals don't stick. We'll need to figure out something more
+// intelligent later, probably based on sending a signal to an fd.
 
 class Thread {
 public:
 
 class Thread {
 public:
@@ -22,38 +15,21 @@ public:
        void stop();
 
 protected:
        void stop();
 
 protected:
-       // Recovers the this pointer, blocks SIGUSR1, and calls do_work().
+       // Recovers the this pointer, and calls do_work().
        static void *do_work_thunk(void *arg);
 
        virtual void do_work() = 0;
 
        static void *do_work_thunk(void *arg);
 
        virtual void do_work() = 0;
 
-       // Waits until there is activity of the given type on <fd> (or an error),
-       // or until a wakeup. Returns true if there was actually activity on
-       // the file descriptor.
-       //
-       // If fd is -1, wait until a wakeup or timeout.
-       // if timeout_ts is NULL, there is no timeout.
-       bool wait_for_activity(int fd, short events, const timespec *timeout_ts);
-
-       // Wait until a wakeup.
-       void wait_for_wakeup(const timespec *timeout_ts) { wait_for_activity(-1, 0, timeout_ts); }
-
-       // Make wait_for_activity() return.
-       void wakeup();
+       volatile bool should_stop;
 
 
-       bool should_stop();
-
-       // The signal set as it were before we blocked SIGUSR1.
-       sigset_t sigset_without_usr1_block;
+       // A pipe that you can poll on if you want to see when should_stop
+       // has been set to true; stop() will write a single byte to the pipe
+       // and then close the other end.
+       int stop_fd_read;
 
 private:
        pthread_t worker_thread;
 
 private:
        pthread_t worker_thread;
-
-       // Protects should_stop_status.
-       pthread_mutex_t should_stop_mutex;
-
-       // If this is set, the thread should return as soon as possible from do_work().
-       bool should_stop_status;
+       int stop_fd_write;
 };
 
 #endif  // !defined(_THREAD_H)
 };
 
 #endif  // !defined(_THREAD_H)
index 07bf63d344e8ba6e38d7d8d399acafad3016058c..4222515e71c58366cc372c9faf33ea8a5b6788f4 100644 (file)
@@ -12,7 +12,6 @@
 #include "serverpool.h"
 #include "state.pb.h"
 #include "udpinput.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "udpinput.h"
-#include "util.h"
 #include "version.h"
 
 using namespace std;
 #include "version.h"
 
 using namespace std;
@@ -53,7 +52,15 @@ InputProto UDPInput::serialize() const
 
 void UDPInput::close_socket()
 {
 
 void UDPInput::close_socket()
 {
-       safe_close(sock);
+       int ret;
+       do {
+               ret = close(sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               log_perror("close()");
+       }
+
        sock = -1;
 }
        
        sock = -1;
 }
        
@@ -75,7 +82,7 @@ void UDPInput::add_destination(const string &stream_id)
 
 void UDPInput::do_work()
 {
 
 void UDPInput::do_work()
 {
-       while (!should_stop()) {
+       while (!should_stop) {
                if (sock == -1) {
                        int port_num = atoi(port.c_str());
                        sock = create_server_socket(port_num, UDP_SOCKET);
                if (sock == -1) {
                        int port_num = atoi(port.c_str());
                        sock = create_server_socket(port_num, UDP_SOCKET);
@@ -87,12 +94,21 @@ void UDPInput::do_work()
                        }
                }
 
                        }
                }
 
-               // Wait for a packet, or a wakeup.
-               bool activity = wait_for_activity(sock, POLLIN, NULL);
-               if (!activity) {
-                       // Most likely, should_stop was set.
+               // Since we are non-blocking, we need to wait for the right state first.
+               // Wait up to 50 ms, then check should_stop.
+               pollfd pfd;
+               pfd.fd = sock;
+               pfd.events = POLLIN;
+
+               int nfds = poll(&pfd, 1, 50);
+               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
                        continue;
                }
                        continue;
                }
+               if (nfds == -1) {
+                       log_perror("poll");
+                       close_socket();
+                       continue;       
+               }
 
                char buf[4096];
                int ret;
 
                char buf[4096];
                int ret;
index 2bbffdc13c1aeafe0224014060b23043eca47fb1..d6d39f29ada05dc4437886d61178d190df189d02 100644 (file)
--- a/util.cpp
+++ b/util.cpp
@@ -33,7 +33,9 @@ int make_tempfile(const std::string &contents)
                ssize_t ret = write(fd, ptr, to_write);
                if (ret == -1) {
                        log_perror("write");
                ssize_t ret = write(fd, ptr, to_write);
                if (ret == -1) {
                        log_perror("write");
-                       safe_close(fd);
+                       if (close(fd) == -1) {
+                               log_perror("close");
+                       }
                        return -1;
                }
 
                        return -1;
                }
 
@@ -47,7 +49,17 @@ int make_tempfile(const std::string &contents)
 bool read_tempfile_and_close(int fd, std::string *contents)
 {
        bool ok = read_tempfile(fd, contents);
 bool read_tempfile_and_close(int fd, std::string *contents)
 {
        bool ok = read_tempfile(fd, contents);
-       safe_close(fd);  // Implicitly deletes the file.
+
+       int ret;
+       do {
+               ret = close(fd);  // Implicitly deletes the file.
+       } while (ret == -1 && errno == EINTR);
+       
+       if (ret == -1) {
+               log_perror("close");
+               // Can still continue.
+       }
+
        return ok;
 }
 
        return ok;
 }
 
@@ -84,17 +96,3 @@ bool read_tempfile(int fd, std::string *contents)
 
        return true;
 }
 
        return true;
 }
-
-int safe_close(int fd)
-{
-       int ret;
-       do {
-               ret = close(fd);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close()");
-       }
-
-       return ret;
-}
diff --git a/util.h b/util.h
index d1670750d522f36ec7032736530516b10d59b894..303de14d0098e26cbd8f9c535cd6e2438f92021e 100644 (file)
--- a/util.h
+++ b/util.h
@@ -15,8 +15,4 @@ bool read_tempfile_and_close(int fd, std::string *contents);
 // Same as read_tempfile_and_close(), without the close.
 bool read_tempfile(int fd, std::string *contents);
 
 // Same as read_tempfile_and_close(), without the close.
 bool read_tempfile(int fd, std::string *contents);
 
-// Close a file descriptor, taking care of EINTR on the way.
-// log_perror() if it fails; apart from that, behaves as close().
-int safe_close(int fd);
-
 #endif  // !defined(_UTIL_H
 #endif  // !defined(_UTIL_H