#include "server.h"
#include "state.pb.h"
#include "stream.h"
+#include "util.h"
using namespace std;
delete stream_it->second;
}
- int ret;
- do {
- ret = close(epoll_fd);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close(epoll_fd)");
- }
+ safe_close(epoll_fd);
}
vector<ClientStats> Server::get_client_stats() const
void Server::do_work()
{
- for ( ;; ) {
- int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
- if (nfds == -1 && errno == EINTR) {
- if (should_stop) {
- return;
- }
- continue;
- }
- if (nfds == -1) {
+ while (!should_stop()) {
+ // Wait until there's activity on at least one of the fds,
+ // or we are waken up due to new queued clients or data.
+ int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block);
+ if (nfds == -1 && errno != EINTR) {
log_perror("epoll_wait");
exit(1);
}
process_client(to_process[i]);
}
}
-
- if (should_stop) {
- return;
- }
}
}
{
MutexLock lock(&queued_data_mutex);
queued_add_clients.push_back(sock);
+ wakeup();
}
void Server::add_client(int sock)
{
MutexLock lock(&queued_data_mutex);
queued_data[stream_id].append(string(data, data + bytes));
+ wakeup();
}
// See the .h file for postconditions after this function.
access_log->write(client->get_stats());
// Bye-bye!
- int ret;
- do {
- ret = close(client->sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(client->sock);
clients.erase(client->sock);
}