X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=60836a74ca52774855f80c2a3ce623545dbc534f;hp=d8c68fc4f5117df388d9d9c852f351ce92f8d330;hb=e8740ea38fa1b54672a83744549fcd1463403d98;hpb=4fcea4a90506f45311daf49a58b02a3723e6a5ff diff --git a/server.cpp b/server.cpp index d8c68fc..60836a7 100644 --- a/server.cpp +++ b/server.cpp @@ -109,6 +109,7 @@ void Stream::wake_up_all_clients() Server::Server() { pthread_mutex_init(&mutex, NULL); + pthread_mutex_init(&queued_data_mutex, NULL); epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { @@ -178,6 +179,8 @@ void Server::do_work() return; } + process_queued_data(); + for (int i = 0; i < nfds; ++i) { int fd = events[i].data.fd; assert(clients.count(fd) != 0); @@ -203,8 +206,11 @@ void Server::do_work() } } -CubemapStateProto Server::serialize() const +CubemapStateProto Server::serialize() { + // We don't serialize anything queued, so empty the queues. + process_queued_data(); + CubemapStateProto serialized; for (map::const_iterator client_it = clients.begin(); client_it != clients.end(); @@ -219,9 +225,14 @@ CubemapStateProto Server::serialize() const return serialized; } +void Server::add_client_deferred(int sock) +{ + MutexLock lock(&queued_data_mutex); + queued_add_clients.push_back(sock); +} + void Server::add_client(int sock) { - MutexLock lock(&mutex); clients.insert(make_pair(sock, Client(sock))); // Start listening on data from this socket. @@ -297,14 +308,15 @@ void Server::set_header(const string &stream_id, const string &header) } } } - -void Server::add_data(const string &stream_id, const char *data, size_t bytes) + +void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) { - if (bytes == 0) { - return; - } + MutexLock lock(&queued_data_mutex); + queued_data[stream_id].append(string(data, data + bytes)); +} - MutexLock lock(&mutex); +void Server::add_data(const string &stream_id, const char *data, size_t bytes) +{ Stream *stream = find_stream(stream_id); size_t pos = stream->data_size % BACKLOG_SIZE; stream->data_size += bytes; @@ -608,3 +620,20 @@ Stream *Server::find_stream(const string &stream_id) assert(it != streams.end()); return it->second; } + +void Server::process_queued_data() +{ + MutexLock lock(&queued_data_mutex); + + for (size_t i = 0; i < queued_add_clients.size(); ++i) { + add_client(queued_add_clients[i]); + } + queued_add_clients.clear(); + + for (map::iterator queued_it = queued_data.begin(); + queued_it != queued_data.end(); + ++queued_it) { + add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + } + queued_data.clear(); +}