Make the list of sleeping clients be per-stream instead of global, so we do not wake...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 7 Apr 2013 22:21:40 +0000 (00:21 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 7 Apr 2013 22:21:40 +0000 (00:21 +0200)
server.cpp
server.h

index d41e3c70730b896e44a9362bf150ae534ee9f892..d8c68fc4f5117df388d9d9c852f351ce92f8d330 100644 (file)
@@ -91,6 +91,21 @@ StreamProto Stream::serialize() const
        return serialized;
 }
 
+void Stream::put_client_to_sleep(Client *client)
+{
+       sleeping_clients.push_back(client);
+}
+
+void Stream::wake_up_all_clients()
+{
+       if (to_process.empty()) {
+               swap(sleeping_clients, to_process);
+       } else {
+               to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
+               sleeping_clients.clear();
+       }
+}
+
 Server::Server()
 {
        pthread_mutex_init(&mutex, NULL);
@@ -176,10 +191,15 @@ void Server::do_work()
                        process_client(client);
                }
 
-               for (unsigned i = 0; i < to_process.size(); ++i) {
-                       process_client(to_process[i]);
+               for (map<string, Stream *>::iterator stream_it = streams.begin();
+                    stream_it != streams.end();
+                    ++stream_it) {
+                       Stream *stream = stream_it->second;
+                       for (size_t i = 0; i < stream->to_process.size(); ++i) {
+                               process_client(stream->to_process[i]);
+                       }
+                       stream->to_process.clear();
                }
-               to_process.clear();
        }
 }
 
@@ -242,7 +262,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
 
        if (client_ptr->state == Client::SENDING_DATA && 
            client_ptr->bytes_sent == client_ptr->stream->data_size) {
-               put_client_to_sleep(client_ptr);
+               client_ptr->stream->put_client_to_sleep(client_ptr);
        } else {
                process_client(client_ptr);
        }
@@ -298,7 +318,7 @@ void Server::add_data(const string &stream_id, const char *data, size_t bytes)
        }
 
        memcpy(stream->data + pos, data, bytes);
-       wake_up_all_clients();
+       stream->wake_up_all_clients();
 }
 
 // See the .h file for postconditions after this function.     
@@ -417,13 +437,13 @@ sending_header_or_error_again:
                // This is postcondition #3.
                client->state = Client::SENDING_DATA;
                client->bytes_sent = client->stream->data_size;
-               put_client_to_sleep(client);
+               client->stream->put_client_to_sleep(client);
                return;
        }
        case Client::SENDING_DATA: {
                // See if there's some data we've lost. Ideally, we should drop to a block boundary,
                // but resync will be the mux's problem.
-               const Stream *stream = client->stream;
+               Stream *stream = client->stream;
                size_t bytes_to_send = stream->data_size - client->bytes_sent;
                if (bytes_to_send == 0) {
                        return;
@@ -475,7 +495,7 @@ sending_header_or_error_again:
                if (client->bytes_sent == stream->data_size) {
                        // We don't have any more data for this client, so put it to sleep.
                        // This is postcondition #3.
-                       put_client_to_sleep(client);
+                       stream->put_client_to_sleep(client);
                } else {
                        // XXX: Do we need to go another round here to explicitly
                        // get the EAGAIN?
@@ -560,9 +580,14 @@ void Server::close_client(Client *client)
        }
 
        // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
-       vector<Client *>::iterator new_end =
-               remove(sleeping_clients.begin(), sleeping_clients.end(), client);
-       sleeping_clients.erase(new_end, sleeping_clients.end());
+       if (client->stream != NULL) {
+               vector<Client *>::iterator new_end =
+                       remove(client->stream->sleeping_clients.begin(),
+                              client->stream->sleeping_clients.end(),
+                              client);
+               client->stream->sleeping_clients.erase(
+                       new_end, client->stream->sleeping_clients.end());
+       }
        
        // Bye-bye!
        int ret;
@@ -577,21 +602,6 @@ void Server::close_client(Client *client)
        clients.erase(client->sock);
 }
        
-void Server::put_client_to_sleep(Client *client)
-{
-       sleeping_clients.push_back(client);
-}
-
-void Server::wake_up_all_clients()
-{
-       if (to_process.empty()) {
-               swap(sleeping_clients, to_process);
-       } else {
-               to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
-               sleeping_clients.clear();
-       }
-}
-       
 Stream *Server::find_stream(const string &stream_id)
 {
        map<string, Stream *>::iterator it = streams.find(stream_id);
index 6e0dd9dac2fd98f48d2a7fc24def92b5eee064cd..d87e7dfab0d8dd9a08423b7b2c47b998e2b277a2 100644 (file)
--- a/server.h
+++ b/server.h
@@ -73,6 +73,23 @@ struct Stream {
        // How many bytes <data> contains. Can very well be larger than BACKLOG_SIZE,
        // since the buffer wraps.
        size_t data_size;
+       
+       // Clients that are in SENDING_DATA, but that we don't listen on,
+       // because we currently don't have any data for them.
+       // See put_client_to_sleep() and wake_up_all_clients().
+       std::vector<Client *> sleeping_clients;
+
+       // Clients that we recently got data for (when they were in
+       // <sleeping_clients>).
+       std::vector<Client *> to_process;
+
+       // Put client to sleep, since there is no more data for it; we will on
+       // longer listen on POLLOUT until we get more data. Also, it will be put
+       // in the list of clients to wake up when we do.
+       void put_client_to_sleep(Client *client);
+
+       // We have more data, so mark all clients that are sleeping as ready to go.
+       void wake_up_all_clients();
 
 private:
        Stream(const Stream& other);
@@ -119,15 +136,6 @@ private:
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];
 
-       // Clients that are in SENDING_DATA, but that we don't listen on,
-       // because we currently don't have any data for them.
-       // See put_client_to_sleep() and wake_up_all_clients().
-       std::vector<Client *> sleeping_clients;
-
-       // Clients that we recently got data for (when they were in
-       // <sleeping_clients>).
-       std::vector<Client *> to_process;
-
        // Recover the this pointer, and call do_work().
        static void *do_work_thunk(void *arg);
 
@@ -163,14 +171,6 @@ private:
        // the SENDING_ERROR state.
        void construct_error(Client *client, int error_code);
 
-       // Put client to sleep, since there is no more data for it; we will on
-       // longer listen on POLLOUT until we get more data. Also, it will be put
-       // in the list of clients to wake up when we do.
-       void put_client_to_sleep(Client *client);
-
-       // We have more data, so mark all clients that are sleeping as ready to go.
-       void wake_up_all_clients();
-
        // TODO: This function should probably die.
        Stream *find_stream(const std::string &stream_id);
 };