X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=server.cpp;h=d41e3c70730b896e44a9362bf150ae534ee9f892;hb=d9de2a370c08fd6738984e759a1ab4f91015af0c;hp=855e21c9767587fba2590fccadb47cdb5de852c5;hpb=a0ad2d9d955fcb5f0aa3cf4f89999c34e8408124;p=cubemap diff --git a/server.cpp b/server.cpp index 855e21c..d41e3c7 100644 --- a/server.cpp +++ b/server.cpp @@ -162,7 +162,7 @@ void Server::do_work() if (should_stop) { return; } - + for (int i = 0; i < nfds; ++i) { int fd = events[i].data.fd; assert(clients.count(fd) != 0); @@ -175,6 +175,11 @@ void Server::do_work() process_client(client); } + + for (unsigned i = 0; i < to_process.size(); ++i) { + process_client(to_process[i]); + } + to_process.clear(); } } @@ -201,13 +206,15 @@ void Server::add_client(int sock) // Start listening on data from this socket. epoll_event ev; - ev.events = EPOLLIN | EPOLLRDHUP; + ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; ev.data.u64 = 0; // Keep Valgrind happy. ev.data.fd = sock; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } + + process_client(&clients[sock]); } void Server::add_client_from_serialized(const ClientProto &client) @@ -215,6 +222,7 @@ void Server::add_client_from_serialized(const ClientProto &client) MutexLock lock(&mutex); Stream *stream = find_stream(client.stream_id()); clients.insert(make_pair(client.sock(), Client(client, stream))); + Client *client_ptr = &clients[client.sock()]; // Start listening on data from this socket. epoll_event ev; @@ -231,6 +239,13 @@ void Server::add_client_from_serialized(const ClientProto &client) perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } + + if (client_ptr->state == Client::SENDING_DATA && + client_ptr->bytes_sent == client_ptr->stream->data_size) { + put_client_to_sleep(client_ptr); + } else { + process_client(client_ptr); + } } void Server::add_stream(const string &stream_id) @@ -402,7 +417,7 @@ sending_header_or_error_again: // This is postcondition #3. client->state = Client::SENDING_DATA; client->bytes_sent = client->stream->data_size; - sleeping_clients.push_back(client); + put_client_to_sleep(client); return; } case Client::SENDING_DATA: { @@ -410,6 +425,9 @@ sending_header_or_error_again: // but resync will be the mux's problem. const Stream *stream = client->stream; size_t bytes_to_send = stream->data_size - client->bytes_sent; + if (bytes_to_send == 0) { + return; + } if (bytes_to_send > BACKLOG_SIZE) { fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n", client->sock, @@ -488,7 +506,7 @@ int Server::parse_request(Client *client) } client->stream_id = request_tokens[1]; - client->stream = streams[client->stream_id]; + client->stream = find_stream(client->stream_id); client->request.clear(); return 200; // OK! @@ -566,10 +584,11 @@ void Server::put_client_to_sleep(Client *client) void Server::wake_up_all_clients() { - vector to_process; - swap(sleeping_clients, to_process); - for (unsigned i = 0; i < to_process.size(); ++i) { - process_client(to_process[i]); + if (to_process.empty()) { + swap(sleeping_clients, to_process); + } else { + to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end()); + sleeping_clients.clear(); } }