]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Fix merge snafu in last commit. (Oops.)
[cubemap] / server.cpp
index 8282a562863deea72c1294cb55f8e0a0100f6a8d..2eb33ed9edcac0ff793f779a7fec21db4cc6b058 100644 (file)
@@ -1,9 +1,11 @@
-#include <arpa/inet.h>
 #include <assert.h>
 #include <errno.h>
+#include <netinet/in.h>
 #include <pthread.h>
+#include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 #include <sys/epoll.h>
 #include <sys/sendfile.h>
 #include <sys/socket.h>
@@ -24,6 +26,7 @@
 #include "server.h"
 #include "state.pb.h"
 #include "stream.h"
+#include "util.h"
 
 using namespace std;
 
@@ -49,14 +52,7 @@ Server::~Server()
                delete stream_it->second;
        }
 
-       int ret;
-       do {
-               ret = close(epoll_fd);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close(epoll_fd)");
-       }
+       safe_close(epoll_fd);
 }
 
 vector<ClientStats> Server::get_client_stats() const
@@ -74,15 +70,17 @@ vector<ClientStats> Server::get_client_stats() const
 
 void Server::do_work()
 {
-       for ( ;; ) {
-               int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
-               if (nfds == -1 && errno == EINTR) {
-                       if (should_stop) {
-                               return;
-                       }
-                       continue;
-               }
-               if (nfds == -1) {
+       while (!should_stop()) {
+               // Wait until there's activity on at least one of the fds,
+               // or 20 ms (about one frame at 50 fps) has elapsed.
+               //
+               // We could in theory wait forever and rely on wakeup()
+               // from add_client_deferred() and add_data_deferred(),
+               // but wakeup is a pretty expensive operation, and the
+               // two threads might end up fighting over a lock, so it's
+               // seemingly (much) more efficient to just have a timeout here.
+               int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS, &sigset_without_usr1_block);
+               if (nfds == -1 && errno != EINTR) {
                        log_perror("epoll_wait");
                        exit(1);
                }
@@ -111,10 +109,6 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
-
-               if (should_stop) {
-                       return;
-               }
        }
 }
 
@@ -145,18 +139,21 @@ void Server::add_client_deferred(int sock)
 
 void Server::add_client(int sock)
 {
-       clients.insert(make_pair(sock, Client(sock)));
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(sock, Client(sock)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-       ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
+       ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
                log_perror("epoll_ctl(EPOLL_CTL_ADD)");
                exit(1);
        }
 
-       process_client(&clients[sock]);
+       process_client(client_ptr);
 }
 
 void Server::add_client_from_serialized(const ClientProto &client)
@@ -169,8 +166,10 @@ void Server::add_client_from_serialized(const ClientProto &client)
        } else {
                stream = stream_it->second;
        }
-       clients.insert(make_pair(client.sock(), Client(client, stream)));
-       Client *client_ptr = &clients[client.sock()];
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(client.sock(), Client(client, stream)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
@@ -202,10 +201,10 @@ void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::En
        streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
 }
 
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+       streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
 }
        
 void Server::set_backlog_size(const string &stream_id, size_t new_size)
@@ -251,7 +250,7 @@ void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
 void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
 {
        MutexLock lock(&queued_data_mutex);
-       queued_data[stream_id].append(string(data, data + bytes));
+       find_stream(stream_id)->add_data_deferred(data, bytes);
 }
 
 // See the .h file for postconditions after this function.     
@@ -376,13 +375,17 @@ sending_data_again:
                        return;
                }
                if (bytes_to_send > stream->backlog_size) {
-                       log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection",
-                               client->remote_addr.c_str(),
-                               (long long int)(bytes_to_send - stream->backlog_size));
+                       size_t bytes_lost = bytes_to_send - stream->backlog_size;
                        client->stream_pos = stream->bytes_received - stream->backlog_size;
-                       client->bytes_lost += bytes_to_send - stream->backlog_size;
+                       client->bytes_lost += bytes_lost;
                        ++client->num_loss_events;
                        bytes_to_send = stream->backlog_size;
+
+                       double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent);
+                       log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection",
+                               client->remote_addr.c_str(),
+                               (long long int)(bytes_lost),
+                               100.0 * loss_fraction);
                }
 
                // See if we need to split across the circular buffer.
@@ -547,14 +550,7 @@ void Server::close_client(Client *client)
        access_log->write(client->get_stats());
 
        // Bye-bye!
-       int ret;
-       do {
-               ret = close(client->sock);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
-               log_perror("close");
-       }
+       safe_close(client->sock);
 
        clients.erase(client->sock);
 }
@@ -575,12 +571,9 @@ void Server::process_queued_data()
        }
        queued_add_clients.clear();     
        
-       for (map<string, string>::iterator queued_it = queued_data.begin();
-            queued_it != queued_data.end();
-            ++queued_it) {
-               Stream *stream = find_stream(queued_it->first);
-               stream->add_data(queued_it->second.data(), queued_it->second.size());
-               stream->wake_up_all_clients();
+       for (map<string, Stream *>::iterator stream_it = streams.begin();
+            stream_it != streams.end();
+            ++stream_it) {
+               stream_it->second->process_queued_data();
        }
-       queued_data.clear();
 }