X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=server.cpp;h=a0a32f8e704a716cad99563cd75b4bc47cfeb41b;hb=ba03a3ed19cc8b02d4bb198edfd1c8bee9f28e35;hp=74a20c83d2308c5a0664b2e15ad040970ab47257;hpb=1bc0cd3639cfef1403c74f1d82960df3e8692149;p=cubemap diff --git a/server.cpp b/server.cpp index 74a20c8..a0a32f8 100644 --- a/server.cpp +++ b/server.cpp @@ -1,9 +1,11 @@ -#include #include #include +#include #include +#include #include #include +#include #include #include #include @@ -24,6 +26,7 @@ #include "server.h" #include "state.pb.h" #include "stream.h" +#include "util.h" using namespace std; @@ -49,14 +52,7 @@ Server::~Server() delete stream_it->second; } - int ret; - do { - ret = close(epoll_fd); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close(epoll_fd)"); - } + safe_close(epoll_fd); } vector Server::get_client_stats() const @@ -74,15 +70,11 @@ vector Server::get_client_stats() const void Server::do_work() { - for ( ;; ) { - int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS); - if (nfds == -1 && errno == EINTR) { - if (should_stop) { - return; - } - continue; - } - if (nfds == -1) { + while (!should_stop()) { + // Wait until there's activity on at least one of the fds, + // or we are waken up due to new queued clients or data. + int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block); + if (nfds == -1 && errno != EINTR) { log_perror("epoll_wait"); exit(1); } @@ -111,10 +103,6 @@ void Server::do_work() process_client(to_process[i]); } } - - if (should_stop) { - return; - } } } @@ -141,22 +129,26 @@ void Server::add_client_deferred(int sock) { MutexLock lock(&queued_data_mutex); queued_add_clients.push_back(sock); + wakeup(); } void Server::add_client(int sock) { - clients.insert(make_pair(sock, Client(sock))); + pair::iterator, bool> ret = + clients.insert(make_pair(sock, Client(sock))); + assert(ret.second == true); // Should not already exist. + Client *client_ptr = &ret.first->second; // Start listening on data from this socket. epoll_event ev; ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - ev.data.u64 = reinterpret_cast(&clients[sock]); + ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } - process_client(&clients[sock]); + process_client(client_ptr); } void Server::add_client_from_serialized(const ClientProto &client) @@ -169,8 +161,10 @@ void Server::add_client_from_serialized(const ClientProto &client) } else { stream = stream_it->second; } - clients.insert(make_pair(client.sock(), Client(client, stream))); - Client *client_ptr = &clients[client.sock()]; + pair::iterator, bool> ret = + clients.insert(make_pair(client.sock(), Client(client, stream))); + assert(ret.second == true); // Should not already exist. + Client *client_ptr = &ret.first->second; // Start listening on data from this socket. epoll_event ev; @@ -252,6 +246,7 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t { MutexLock lock(&queued_data_mutex); queued_data[stream_id].append(string(data, data + bytes)); + wakeup(); } // See the .h file for postconditions after this function. @@ -551,14 +546,7 @@ void Server::close_client(Client *client) access_log->write(client->get_stats()); // Bye-bye! - int ret; - do { - ret = close(client->sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - } + safe_close(client->sock); clients.erase(client->sock); }