void Server::add_data(const string &stream_id, const char *data, size_t bytes)
{
+ if (bytes == 0) {
+ return;
+ }
+
MutexLock lock(&mutex);
assert(streams.count(stream_id) != 0);
Stream *stream = &streams[stream_id];
}
memcpy(stream->data + pos, data, bytes);
-
- // TODO: wake up clients
+ wake_up_all_clients();
}
void Server::process_client(Client *client)
close_client(client);
return;
}
- client->bytes_sent += ret;
-
- // TODO: put clients to sleep
+ client->bytes_sent += ret;
+ if (client->bytes_sent == stream.data_size) {
+ // We don't have any more data for this client, so put it to sleep.
+ put_client_to_sleep(client);
+ }
break;
}
default:
close(client->sock);
clients.erase(client->sock);
}
+
+void Server::put_client_to_sleep(Client *client)
+{
+ epoll_event ev;
+ ev.events = EPOLLRDHUP;
+ ev.data.fd = client->sock;
+
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
+ perror("epoll_ctl(EPOLL_CTL_MOD)");
+ exit(1);
+ }
+
+ sleeping_clients.push_back(client->sock);
+}
+
+void Server::wake_up_all_clients()
+{
+ for (unsigned i = 0; i < sleeping_clients.size(); ++i) {
+ epoll_event ev;
+ ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.data.fd = sleeping_clients[i];
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sleeping_clients[i], &ev) == -1) {
+ perror("epoll_ctl(EPOLL_CTL_MOD)");
+ exit(1);
+ }
+ }
+ sleeping_clients.clear();
+}