void Server::add_data(const string &stream_id, const char *data, size_t bytes)
{
+ if (bytes == 0) {
+ return;
+ }
+
MutexLock lock(&mutex);
assert(streams.count(stream_id) != 0);
Stream *stream = &streams[stream_id];
}
memcpy(stream->data + pos, data, bytes);
-
- // TODO: wake up clients
+ wake_up_all_clients();
}
void Server::process_client(Client *client)
close_client(client);
return;
}
- client->bytes_sent += ret;
-
- // TODO: put clients to sleep
+ client->bytes_sent += ret;
+ if (client->bytes_sent == stream.data_size) {
+ // We don't have any more data for this client, so put it to sleep.
+ put_client_to_sleep(client);
+ }
break;
}
default:
close(client->sock);
clients.erase(client->sock);
}
+
+void Server::put_client_to_sleep(Client *client)
+{
+ epoll_event ev;
+ ev.events = EPOLLRDHUP;
+ ev.data.fd = client->sock;
+
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
+ perror("epoll_ctl(EPOLL_CTL_MOD)");
+ exit(1);
+ }
+
+ sleeping_clients.push_back(client->sock);
+}
+
+void Server::wake_up_all_clients()
+{
+ for (unsigned i = 0; i < sleeping_clients.size(); ++i) {
+ epoll_event ev;
+ ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.data.fd = sleeping_clients[i];
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sleeping_clients[i], &ev) == -1) {
+ perror("epoll_ctl(EPOLL_CTL_MOD)");
+ exit(1);
+ }
+ }
+ sleeping_clients.clear();
+}
void add_data(const std::string &stream_id, const char *data, size_t bytes);
private:
- void process_client(Client *client);
-
- // Close a given client socket, and clean up after it.
- void close_client(Client *client);
-
- // Parse the HTTP request, construct the header, and set the client into
- // the SENDING_HEADER state.
- void parse_request(Client *client);
-
pthread_mutex_t mutex;
// Map from stream ID to stream.
int epoll_fd;
epoll_event events[EPOLL_MAX_EVENTS];
+ // Clients that are in SENDING_DATA, but that we don't listen on,
+ // because we currently don't have any data for them.
+ // See put_client_to_sleep() and wake_up_all_clients().
+ std::vector<int> sleeping_clients;
+
// Recover the this pointer, and call do_work().
static void *do_work_thunk(void *arg);
// The actual worker thread.
void do_work();
+
+ void process_client(Client *client);
+
+ // Close a given client socket, and clean up after it.
+ void close_client(Client *client);
+
+ // Parse the HTTP request, construct the header, and set the client into
+ // the SENDING_HEADER state.
+ void parse_request(Client *client);
+
+ // Put client to sleep, since there is no more data for it; we will on
+ // longer listen on POLLOUT until we get more data. Also, it will be put
+ // in the list of clients to wake up when we do.
+ void put_client_to_sleep(Client *client);
+
+ // We have more data, so mark all clients that are sleeping as ready to go.
+ void wake_up_all_clients();
};
#endif // !defined(_SERVER_H)