]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Fix merge snafu in last commit. (Oops.)
[cubemap] / server.cpp
index a000d9c6d08f0f3d799c07f3bfec1902f68fc141..2eb33ed9edcac0ff793f779a7fec21db4cc6b058 100644 (file)
@@ -26,6 +26,7 @@
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
+#include "util.h"
 
 using namespace std;
 
@@ -51,14 +52,7 @@ Server::~Server()
                delete stream_it->second;
        }
 
-       int ret;
-       do {
-               ret = close(epoll_fd);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close(epoll_fd)");
-       }
+       safe_close(epoll_fd);
 }
 
 vector<ClientStats> Server::get_client_stats() const
@@ -76,15 +70,17 @@ vector<ClientStats> Server::get_client_stats() const
 
 void Server::do_work()
 {
-       for ( ;; ) {
-               int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
-               if (nfds == -1 && errno == EINTR) {
-                       if (should_stop) {
-                               return;
-                       }
-                       continue;
-               }
-               if (nfds == -1) {
+       while (!should_stop()) {
+               // Wait until there's activity on at least one of the fds,
+               // or 20 ms (about one frame at 50 fps) has elapsed.
+               //
+               // We could in theory wait forever and rely on wakeup()
+               // from add_client_deferred() and add_data_deferred(),
+               // but wakeup is a pretty expensive operation, and the
+               // two threads might end up fighting over a lock, so it's
+               // seemingly (much) more efficient to just have a timeout here.
+               int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS, &sigset_without_usr1_block);
+               if (nfds == -1 && errno != EINTR) {
                        log_perror("epoll_wait");
                        exit(1);
                }
@@ -113,10 +109,6 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
-
-               if (should_stop) {
-                       return;
-               }
        }
 }
 
@@ -147,18 +139,21 @@ void Server::add_client_deferred(int sock)
 
 void Server::add_client(int sock)
 {
-       clients.insert(make_pair(sock, Client(sock)));
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(sock, Client(sock)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-       ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
+       ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
                log_perror("epoll_ctl(EPOLL_CTL_ADD)");
                exit(1);
        }
 
-       process_client(&clients[sock]);
+       process_client(client_ptr);
 }
 
 void Server::add_client_from_serialized(const ClientProto &client)
@@ -171,8 +166,10 @@ void Server::add_client_from_serialized(const ClientProto &client)
        } else {
                stream = stream_it->second;
        }
-       clients.insert(make_pair(client.sock(), Client(client, stream)));
-       Client *client_ptr = &clients[client.sock()];
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(client.sock(), Client(client, stream)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
@@ -253,7 +250,7 @@ void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
 void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
 {
        MutexLock lock(&queued_data_mutex);
-       queued_data[stream_id].append(string(data, data + bytes));
+       find_stream(stream_id)->add_data_deferred(data, bytes);
 }
 
 // See the .h file for postconditions after this function.     
@@ -553,14 +550,7 @@ void Server::close_client(Client *client)
        access_log->write(client->get_stats());
 
        // Bye-bye!
-       int ret;
-       do {
-               ret = close(client->sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close");
-       }
+       safe_close(client->sock);
 
        clients.erase(client->sock);
 }
@@ -581,12 +571,9 @@ void Server::process_queued_data()
        }
        queued_add_clients.clear();     
        
-       for (map<string, string>::iterator queued_it = queued_data.begin();
-            queued_it != queued_data.end();
-            ++queued_it) {
-               Stream *stream = find_stream(queued_it->first);
-               stream->add_data(queued_it->second.data(), queued_it->second.size());
-               stream->wake_up_all_clients();
+       for (map<string, Stream *>::iterator stream_it = streams.begin();
+            stream_it != streams.end();
+            ++stream_it) {
+               stream_it->second->process_queued_data();
        }
-       queued_data.clear();
 }