From e8740ea38fa1b54672a83744549fcd1463403d98 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Mon, 8 Apr 2013 20:28:03 +0200 Subject: [PATCH] Make most operations on Server deferred, so that we a) do not get bugs with epoll going out-of-sync with the client state, and b) do not get performance issues stemming from Input not managing to push data often enough to the servers. --- server.cpp | 45 +++++++++++++++++++++++++++++++++++++-------- server.h | 39 +++++++++++++++++++++++++++++++++------ serverpool.cpp | 4 ++-- 3 files changed, 72 insertions(+), 16 deletions(-) diff --git a/server.cpp b/server.cpp index d8c68fc..60836a7 100644 --- a/server.cpp +++ b/server.cpp @@ -109,6 +109,7 @@ void Stream::wake_up_all_clients() Server::Server() { pthread_mutex_init(&mutex, NULL); + pthread_mutex_init(&queued_data_mutex, NULL); epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { @@ -178,6 +179,8 @@ void Server::do_work() return; } + process_queued_data(); + for (int i = 0; i < nfds; ++i) { int fd = events[i].data.fd; assert(clients.count(fd) != 0); @@ -203,8 +206,11 @@ void Server::do_work() } } -CubemapStateProto Server::serialize() const +CubemapStateProto Server::serialize() { + // We don't serialize anything queued, so empty the queues. + process_queued_data(); + CubemapStateProto serialized; for (map::const_iterator client_it = clients.begin(); client_it != clients.end(); @@ -219,9 +225,14 @@ CubemapStateProto Server::serialize() const return serialized; } +void Server::add_client_deferred(int sock) +{ + MutexLock lock(&queued_data_mutex); + queued_add_clients.push_back(sock); +} + void Server::add_client(int sock) { - MutexLock lock(&mutex); clients.insert(make_pair(sock, Client(sock))); // Start listening on data from this socket. @@ -297,14 +308,15 @@ void Server::set_header(const string &stream_id, const string &header) } } } - -void Server::add_data(const string &stream_id, const char *data, size_t bytes) + +void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) { - if (bytes == 0) { - return; - } + MutexLock lock(&queued_data_mutex); + queued_data[stream_id].append(string(data, data + bytes)); +} - MutexLock lock(&mutex); +void Server::add_data(const string &stream_id, const char *data, size_t bytes) +{ Stream *stream = find_stream(stream_id); size_t pos = stream->data_size % BACKLOG_SIZE; stream->data_size += bytes; @@ -608,3 +620,20 @@ Stream *Server::find_stream(const string &stream_id) assert(it != streams.end()); return it->second; } + +void Server::process_queued_data() +{ + MutexLock lock(&queued_data_mutex); + + for (size_t i = 0; i < queued_add_clients.size(); ++i) { + add_client(queued_add_clients[i]); + } + queued_add_clients.clear(); + + for (map::iterator queued_it = queued_data.begin(); + queued_it != queued_data.end(); + ++queued_it) { + add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + } + queued_data.clear(); +} diff --git a/server.h b/server.h index d87e7df..ce9365d 100644 --- a/server.h +++ b/server.h @@ -105,21 +105,43 @@ public: // Stop the thread. void stop(); + + void set_header(const std::string &stream_id, const std::string &header); - CubemapStateProto serialize() const; + // These will be deferred until the next time an iteration in do_work() happens, + // and the order between them are undefined. + // XXX: header should ideally be ordered with respect to data. + void add_client_deferred(int sock); + void add_data_deferred(const std::string &stream_id, const char *data, size_t bytes); - void add_client(int sock); + // These should not be called while running, since that would violate + // threading assumptions (ie., that epoll is only called from one thread + // at the same time). + CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); - void add_stream(const std::string &stream_id); void add_stream_from_serialized(const StreamProto &stream); - void set_header(const std::string &stream_id, const std::string &header); - void add_data(const std::string &stream_id, const char *data, size_t bytes); - private: pthread_t worker_thread; + // Mutex protecting queued_data only. Note that if you want to hold both this + // and below, you will need to take before this one. + pthread_mutex_t queued_data_mutex; + + // Deferred commands that should be run from the do_work() thread as soon as possible. + // We defer these for two reasons: + // + // - We only want to fiddle with epoll from one thread at any given time, + // and doing add_client() from the acceptor thread would violate that. + // - We don't want the input thread(s) hanging on when doing + // add_data(), since they want to do add_data() rather often, and + // can be taken a lot of the time. + // + // Protected by . + std::vector queued_add_clients; + std::map queued_data; + // All variables below this line are protected by the mutex. pthread_mutex_t mutex; @@ -173,6 +195,11 @@ private: // TODO: This function should probably die. Stream *find_stream(const std::string &stream_id); + + void process_queued_data(); + + void add_client(int sock); + void add_data(const std::string &stream_id, const char *data, size_t bytes); }; #endif // !defined(_SERVER_H) diff --git a/serverpool.cpp b/serverpool.cpp index f8fece6..a203594 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -16,7 +16,7 @@ ServerPool::~ServerPool() void ServerPool::add_client(int sock) { - servers[clients_added++ % num_servers].add_client(sock); + servers[clients_added++ % num_servers].add_client_deferred(sock); } void ServerPool::add_client_from_serialized(const ClientProto &client) @@ -48,7 +48,7 @@ void ServerPool::set_header(const std::string &stream_id, const std::string &hea void ServerPool::add_data(const std::string &stream_id, const char *data, size_t bytes) { for (int i = 0; i < num_servers; ++i) { - servers[i].add_data(stream_id, data, bytes); + servers[i].add_data_deferred(stream_id, data, bytes); } } -- 2.39.2