]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Make the list of sleeping clients be per-stream instead of global, so we do not wake...
[cubemap] / server.cpp
index d41e3c70730b896e44a9362bf150ae534ee9f892..d8c68fc4f5117df388d9d9c852f351ce92f8d330 100644 (file)
@@ -91,6 +91,21 @@ StreamProto Stream::serialize() const
        return serialized;
 }
 
+void Stream::put_client_to_sleep(Client *client)
+{
+       sleeping_clients.push_back(client);
+}
+
+void Stream::wake_up_all_clients()
+{
+       if (to_process.empty()) {
+               swap(sleeping_clients, to_process);
+       } else {
+               to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
+               sleeping_clients.clear();
+       }
+}
+
 Server::Server()
 {
        pthread_mutex_init(&mutex, NULL);
@@ -176,10 +191,15 @@ void Server::do_work()
                        process_client(client);
                }
 
-               for (unsigned i = 0; i < to_process.size(); ++i) {
-                       process_client(to_process[i]);
+               for (map<string, Stream *>::iterator stream_it = streams.begin();
+                    stream_it != streams.end();
+                    ++stream_it) {
+                       Stream *stream = stream_it->second;
+                       for (size_t i = 0; i < stream->to_process.size(); ++i) {
+                               process_client(stream->to_process[i]);
+                       }
+                       stream->to_process.clear();
                }
-               to_process.clear();
        }
 }
 
@@ -242,7 +262,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
 
        if (client_ptr->state == Client::SENDING_DATA && 
            client_ptr->bytes_sent == client_ptr->stream->data_size) {
-               put_client_to_sleep(client_ptr);
+               client_ptr->stream->put_client_to_sleep(client_ptr);
        } else {
                process_client(client_ptr);
        }
@@ -298,7 +318,7 @@ void Server::add_data(const string &stream_id, const char *data, size_t bytes)
        }
 
        memcpy(stream->data + pos, data, bytes);
-       wake_up_all_clients();
+       stream->wake_up_all_clients();
 }
 
 // See the .h file for postconditions after this function.     
@@ -417,13 +437,13 @@ sending_header_or_error_again:
                // This is postcondition #3.
                client->state = Client::SENDING_DATA;
                client->bytes_sent = client->stream->data_size;
-               put_client_to_sleep(client);
+               client->stream->put_client_to_sleep(client);
                return;
        }
        case Client::SENDING_DATA: {
                // See if there's some data we've lost. Ideally, we should drop to a block boundary,
                // but resync will be the mux's problem.
-               const Stream *stream = client->stream;
+               Stream *stream = client->stream;
                size_t bytes_to_send = stream->data_size - client->bytes_sent;
                if (bytes_to_send == 0) {
                        return;
@@ -475,7 +495,7 @@ sending_header_or_error_again:
                if (client->bytes_sent == stream->data_size) {
                        // We don't have any more data for this client, so put it to sleep.
                        // This is postcondition #3.
-                       put_client_to_sleep(client);
+                       stream->put_client_to_sleep(client);
                } else {
                        // XXX: Do we need to go another round here to explicitly
                        // get the EAGAIN?
@@ -560,9 +580,14 @@ void Server::close_client(Client *client)
        }
 
        // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
-       vector<Client *>::iterator new_end =
-               remove(sleeping_clients.begin(), sleeping_clients.end(), client);
-       sleeping_clients.erase(new_end, sleeping_clients.end());
+       if (client->stream != NULL) {
+               vector<Client *>::iterator new_end =
+                       remove(client->stream->sleeping_clients.begin(),
+                              client->stream->sleeping_clients.end(),
+                              client);
+               client->stream->sleeping_clients.erase(
+                       new_end, client->stream->sleeping_clients.end());
+       }
        
        // Bye-bye!
        int ret;
@@ -577,21 +602,6 @@ void Server::close_client(Client *client)
        clients.erase(client->sock);
 }
        
-void Server::put_client_to_sleep(Client *client)
-{
-       sleeping_clients.push_back(client);
-}
-
-void Server::wake_up_all_clients()
-{
-       if (to_process.empty()) {
-               swap(sleeping_clients, to_process);
-       } else {
-               to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
-               sleeping_clients.clear();
-       }
-}
-       
 Stream *Server::find_stream(const string &stream_id)
 {
        map<string, Stream *>::iterator it = streams.find(stream_id);