]> git.sesse.net Git - cubemap/commitdiff
Implement sleeping/waking clients.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 6 Apr 2013 16:08:51 +0000 (18:08 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 6 Apr 2013 16:08:51 +0000 (18:08 +0200)
server.cpp
server.h

index 9b91e0371254e67044a8138424334a21aea4b88c..b914b2ec4fbaa50175021038e42ed9161cd21bad 100644 (file)
@@ -107,6 +107,10 @@ void Server::set_header(const string &stream_id, const string &header)
        
 void Server::add_data(const string &stream_id, const char *data, size_t bytes)
 {
+       if (bytes == 0) {
+               return;
+       }
+
        MutexLock lock(&mutex);
        assert(streams.count(stream_id) != 0);
        Stream *stream = &streams[stream_id];
@@ -122,8 +126,7 @@ void Server::add_data(const string &stream_id, const char *data, size_t bytes)
        }
 
        memcpy(stream->data + pos, data, bytes);
-
-       // TODO: wake up clients
+       wake_up_all_clients();
 }
        
 void Server::process_client(Client *client)
@@ -231,10 +234,12 @@ void Server::process_client(Client *client)
                        close_client(client);
                        return;
                }
-               client->bytes_sent += ret;      
-
-               // TODO: put clients to sleep
+               client->bytes_sent += ret;
 
+               if (client->bytes_sent == stream.data_size) {
+                       // We don't have any more data for this client, so put it to sleep.
+                       put_client_to_sleep(client);
+               }
                break;
        }
        default:
@@ -276,3 +281,31 @@ void Server::close_client(Client *client)
        close(client->sock);
        clients.erase(client->sock);
 }
+       
+void Server::put_client_to_sleep(Client *client)
+{
+       epoll_event ev;
+       ev.events = EPOLLRDHUP;
+       ev.data.fd = client->sock;
+
+       if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
+               perror("epoll_ctl(EPOLL_CTL_MOD)");
+               exit(1);
+       }
+
+       sleeping_clients.push_back(client->sock);
+}
+
+void Server::wake_up_all_clients()
+{
+       for (unsigned i = 0; i < sleeping_clients.size(); ++i) {
+               epoll_event ev;
+               ev.events = EPOLLOUT | EPOLLRDHUP;
+               ev.data.fd = sleeping_clients[i];
+               if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sleeping_clients[i], &ev) == -1) {
+                       perror("epoll_ctl(EPOLL_CTL_MOD)");
+                       exit(1);
+               }
+       }
+       sleeping_clients.clear();
+}
index f1cc7a0dbfc0d56116739ebccda95829d8cfe577..271732b55e5a848a232c73fd967cefa4fa26448c 100644 (file)
--- a/server.h
+++ b/server.h
@@ -63,15 +63,6 @@ public:
        void add_data(const std::string &stream_id, const char *data, size_t bytes);
 
 private:
-       void process_client(Client *client);
-
-       // Close a given client socket, and clean up after it.
-       void close_client(Client *client);
-
-       // Parse the HTTP request, construct the header, and set the client into
-       // the SENDING_HEADER state.
-       void parse_request(Client *client);
-
        pthread_mutex_t mutex;
 
        // Map from stream ID to stream.
@@ -84,11 +75,33 @@ private:
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];
 
+       // Clients that are in SENDING_DATA, but that we don't listen on,
+       // because we currently don't have any data for them.
+       // See put_client_to_sleep() and wake_up_all_clients().
+       std::vector<int> sleeping_clients;
+
        // Recover the this pointer, and call do_work().
        static void *do_work_thunk(void *arg);
 
        // The actual worker thread.
        void do_work();
+
+       void process_client(Client *client);
+
+       // Close a given client socket, and clean up after it.
+       void close_client(Client *client);
+
+       // Parse the HTTP request, construct the header, and set the client into
+       // the SENDING_HEADER state.
+       void parse_request(Client *client);
+
+       // Put client to sleep, since there is no more data for it; we will on
+       // longer listen on POLLOUT until we get more data. Also, it will be put
+       // in the list of clients to wake up when we do.
+       void put_client_to_sleep(Client *client);
+
+       // We have more data, so mark all clients that are sleeping as ready to go.
+       void wake_up_all_clients();
 };
 
 #endif  // !defined(_SERVER_H)