]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Rewrite the entire internal signal handling/wakeup.
[cubemap] / server.cpp
index b5f58d4c56711968db64a8d6c63e7d9eb08d0e25..a0a32f8e704a716cad99563cd75b4bc47cfeb41b 100644 (file)
@@ -26,6 +26,7 @@
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
+#include "util.h"
 
 using namespace std;
 
@@ -51,14 +52,7 @@ Server::~Server()
                delete stream_it->second;
        }
 
-       int ret;
-       do {
-               ret = close(epoll_fd);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close(epoll_fd)");
-       }
+       safe_close(epoll_fd);
 }
 
 vector<ClientStats> Server::get_client_stats() const
@@ -76,15 +70,11 @@ vector<ClientStats> Server::get_client_stats() const
 
 void Server::do_work()
 {
-       for ( ;; ) {
-               int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
-               if (nfds == -1 && errno == EINTR) {
-                       if (should_stop) {
-                               return;
-                       }
-                       continue;
-               }
-               if (nfds == -1) {
+       while (!should_stop()) {
+               // Wait until there's activity on at least one of the fds,
+               // or we are waken up due to new queued clients or data.
+               int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block);
+               if (nfds == -1 && errno != EINTR) {
                        log_perror("epoll_wait");
                        exit(1);
                }
@@ -113,10 +103,6 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
-
-               if (should_stop) {
-                       return;
-               }
        }
 }
 
@@ -143,6 +129,7 @@ void Server::add_client_deferred(int sock)
 {
        MutexLock lock(&queued_data_mutex);
        queued_add_clients.push_back(sock);
+       wakeup();
 }
 
 void Server::add_client(int sock)
@@ -259,6 +246,7 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t
 {
        MutexLock lock(&queued_data_mutex);
        queued_data[stream_id].append(string(data, data + bytes));
+       wakeup();
 }
 
 // See the .h file for postconditions after this function.     
@@ -558,14 +546,7 @@ void Server::close_client(Client *client)
        access_log->write(client->get_stats());
 
        // Bye-bye!
-       int ret;
-       do {
-               ret = close(client->sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close");
-       }
+       safe_close(client->sock);
 
        clients.erase(client->sock);
 }