// Start listening on data from this socket.
epoll_event ev;
if (client.state() == Client::READING_REQUEST) {
- ev.events = EPOLLIN | EPOLLRDHUP;
+ ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
} else {
// If we don't have more data for this client, we'll be putting it into
// the sleeping array again soon.
- ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
}
ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = client.sock();
memcpy(stream->data + pos, data, bytes);
wake_up_all_clients();
}
-
+
+// See the .h file for postconditions after this function.
void Server::process_client(Client *client)
{
switch (client->state) {
case Client::READING_REQUEST: {
+read_request_again:
// Try to read more of the request.
char buf[1024];
int ret;
do {
ret = read(client->sock, buf, sizeof(buf));
} while (ret == -1 && errno == EINTR);
+
+ if (ret == -1 && errno == EAGAIN) {
+ // No more data right now. Nothing to do.
+ // This is postcondition #2.
+ return;
+ }
if (ret == -1) {
perror("read");
close_client(client);
return;
}
if (ret == 0) {
- // No data? This really means that we were triggered for something else than
- // POLLIN (which suggests a logic error in epoll).
- fprintf(stderr, "WARNING: fd %d returned unexpectedly 0 bytes!\n", client->sock);
+ // OK, the socket is closed.
close_client(client);
return;
}
"\r\n\r\n", 4));
if (ptr == NULL) {
// OK, we don't have the entire header yet. Fine; we'll get it later.
- return;
+ // See if there's more data for us.
+ goto read_request_again;
}
if (ptr != client->request.data() + client->request.size() - 4) {
} else {
construct_error(client, error_code);
}
- break;
+
+ // We've changed states, so fall through.
+ assert(client->state == Client::SENDING_ERROR ||
+ client->state == Client::SENDING_HEADER);
}
case Client::SENDING_ERROR:
case Client::SENDING_HEADER: {
+sending_header_or_error_again:
int ret;
do {
ret = write(client->sock,
client->header_or_error.data() + client->header_or_error_bytes_sent,
client->header_or_error.size() - client->header_or_error_bytes_sent);
} while (ret == -1 && errno == EINTR);
+
+ if (ret == -1 && errno == EAGAIN) {
+ // We're out of socket space, so now we're at the “low edge” of epoll's
+ // edge triggering. epoll will tell us when there is more room, so for now,
+ // just return.
+ // This is postcondition #4.
+ return;
+ }
+
if (ret == -1) {
+ // Error! Postcondition #1.
perror("write");
close_client(client);
return;
assert(client->header_or_error_bytes_sent <= client->header_or_error.size());
if (client->header_or_error_bytes_sent < client->header_or_error.size()) {
- // We haven't sent all yet. Fine; we'll do that later.
- return;
+ // We haven't sent all yet. Fine; go another round.
+ goto sending_header_or_error_again;
}
// We're done sending the header or error! Clear it to release some memory.
client->header_or_error.clear();
if (client->state == Client::SENDING_ERROR) {
+ // We're done sending the error, so now close.
+ // This is postcondition #1.
close_client(client);
- } else {
- // Start sending from the end. In other words, we won't send any of the backlog,
- // but we'll start sending immediately as we get data.
- client->state = Client::SENDING_DATA;
- client->bytes_sent = find_stream(client->stream_id)->data_size;
+ return;
}
- break;
+
+ // Start sending from the end. In other words, we won't send any of the backlog,
+ // but we'll start sending immediately as we get data.
+ // This is postcondition #3.
+ client->state = Client::SENDING_DATA;
+ client->bytes_sent = find_stream(client->stream_id)->data_size;
+ sleeping_clients.push_back(client);
+ return;
}
case Client::SENDING_DATA: {
// See if there's some data we've lost. Ideally, we should drop to a block boundary,
bytes_to_send);
} while (ret == -1 && errno == EINTR);
}
+ if (ret == -1 && errno == EAGAIN) {
+ // We're out of socket space, so return; epoll will wake us up
+ // when there is more room.
+ // This is postcondition #4.
+ return;
+ }
if (ret == -1) {
+ // Error, close; postcondition #1.
perror("write/writev");
close_client(client);
return;
if (client->bytes_sent == stream.data_size) {
// We don't have any more data for this client, so put it to sleep.
+ // This is postcondition #3.
put_client_to_sleep(client);
+ } else {
+ // XXX: Do we need to go another round here to explicitly
+ // get the EAGAIN?
}
break;
}
client->state = Client::SENDING_HEADER;
epoll_event ev;
- ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = client->sock;
client->state = Client::SENDING_ERROR;
epoll_event ev;
- ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = client->sock;
}
// This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
- vector<int>::iterator new_end =
- remove(sleeping_clients.begin(), sleeping_clients.end(), client->sock);
+ vector<Client *>::iterator new_end =
+ remove(sleeping_clients.begin(), sleeping_clients.end(), client);
sleeping_clients.erase(new_end, sleeping_clients.end());
// Bye-bye!
void Server::put_client_to_sleep(Client *client)
{
- epoll_event ev;
- ev.events = EPOLLRDHUP;
- ev.data.u64 = 0; // Keep Valgrind happy.
- ev.data.fd = client->sock;
-
- if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
- perror("epoll_ctl(EPOLL_CTL_MOD)");
- exit(1);
- }
-
- sleeping_clients.push_back(client->sock);
+ sleeping_clients.push_back(client);
}
void Server::wake_up_all_clients()
{
- for (unsigned i = 0; i < sleeping_clients.size(); ++i) {
- epoll_event ev;
- ev.events = EPOLLOUT | EPOLLRDHUP;
- ev.data.u64 = 0; // Keep Valgrind happy.
- ev.data.fd = sleeping_clients[i];
- if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sleeping_clients[i], &ev) == -1) {
- perror("epoll_ctl(EPOLL_CTL_MOD)");
- exit(1);
- }
+ vector<Client *> to_process;
+ swap(sleeping_clients, to_process);
+ for (unsigned i = 0; i < to_process.size(); ++i) {
+ process_client(to_process[i]);
}
- sleeping_clients.clear();
}
Stream *Server::find_stream(const string &stream_id)