X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=789d8eabd14c88c67524b632b7dbe1debbe410fd;hp=0cbc453674b75722b19e5f2c6979142cd7551042;hb=a1906d9fe3e565bbd874df72e8552fa7daf9fb9a;hpb=9bb20b7dd0bcea9de1daf4cac29263d74924ce5a diff --git a/server.cpp b/server.cpp index 0cbc453..789d8ea 100644 --- a/server.cpp +++ b/server.cpp @@ -80,9 +80,7 @@ void Server::do_work() process_queued_data(); for (int i = 0; i < nfds; ++i) { - int fd = events[i].data.fd; - assert(clients.count(fd) != 0); - Client *client = &clients[fd]; + Client *client = reinterpret_cast(events[i].data.u64); if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) { close_client(client); @@ -140,8 +138,7 @@ void Server::add_client(int sock) // Start listening on data from this socket. epoll_event ev; ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = sock; + ev.data.u64 = reinterpret_cast(&clients[sock]); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); @@ -153,7 +150,13 @@ void Server::add_client(int sock) void Server::add_client_from_serialized(const ClientProto &client) { MutexLock lock(&mutex); - Stream *stream = find_stream(client.stream_id()); + Stream *stream; + map::iterator stream_it = streams.find(client.stream_id()); + if (stream_it == streams.end()) { + stream = NULL; + } else { + stream = stream_it->second; + } clients.insert(make_pair(client.sock(), Client(client, stream))); Client *client_ptr = &clients[client.sock()]; @@ -167,14 +170,14 @@ void Server::add_client_from_serialized(const ClientProto &client) ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; } ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = client.sock(); + ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) { perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } if (client_ptr->state == Client::SENDING_DATA && - client_ptr->bytes_sent == client_ptr->stream->bytes_received) { + client_ptr->stream_pos == client_ptr->stream->bytes_received) { client_ptr->stream->put_client_to_sleep(client_ptr); } else { process_client(client_ptr); @@ -212,7 +215,7 @@ void Server::set_header(const string &stream_id, const string &header) ++client_it) { Client *client = &client_it->second; if (client->state == Client::SENDING_DATA && - client->bytes_sent == 0) { + client->stream_pos == 0) { construct_header(client); } } @@ -339,7 +342,7 @@ sending_header_or_error_again: // but we'll start sending immediately as we get data. // This is postcondition #3. client->state = Client::SENDING_DATA; - client->bytes_sent = client->stream->bytes_received; + client->stream_pos = client->stream->bytes_received; client->stream->put_client_to_sleep(client); return; } @@ -348,7 +351,7 @@ sending_data_again: // See if there's some data we've lost. Ideally, we should drop to a block boundary, // but resync will be the mux's problem. Stream *stream = client->stream; - size_t bytes_to_send = stream->bytes_received - client->bytes_sent; + size_t bytes_to_send = stream->bytes_received - client->stream_pos; if (bytes_to_send == 0) { return; } @@ -356,20 +359,22 @@ sending_data_again: fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n", client->sock, (long long int)(bytes_to_send - stream->backlog_size)); - client->bytes_sent = stream->bytes_received - stream->backlog_size; + client->stream_pos = stream->bytes_received - stream->backlog_size; + client->bytes_lost += bytes_to_send - stream->backlog_size; + ++client->num_loss_events; bytes_to_send = stream->backlog_size; } // See if we need to split across the circular buffer. bool more_data = false; - if ((client->bytes_sent % stream->backlog_size) + bytes_to_send > stream->backlog_size) { - bytes_to_send = stream->backlog_size - (client->bytes_sent % stream->backlog_size); + if ((client->stream_pos % stream->backlog_size) + bytes_to_send > stream->backlog_size) { + bytes_to_send = stream->backlog_size - (client->stream_pos % stream->backlog_size); more_data = true; } ssize_t ret; do { - loff_t offset = client->bytes_sent % stream->backlog_size; + loff_t offset = client->stream_pos % stream->backlog_size; ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send); } while (ret == -1 && errno == EINTR); @@ -385,13 +390,14 @@ sending_data_again: close_client(client); return; } + client->stream_pos += ret; client->bytes_sent += ret; - if (client->bytes_sent == stream->bytes_received) { + if (client->stream_pos == stream->bytes_received) { // We don't have any more data for this client, so put it to sleep. // This is postcondition #3. stream->put_client_to_sleep(client); - } else if (more_data) { + } else if (more_data && ret == bytes_to_send) { goto sending_data_again; } break; @@ -445,8 +451,7 @@ void Server::construct_header(Client *client) epoll_event ev; ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; - ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = client->sock; + ev.data.u64 = reinterpret_cast(client); if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { perror("epoll_ctl(EPOLL_CTL_MOD)"); @@ -466,8 +471,7 @@ void Server::construct_error(Client *client, int error_code) epoll_event ev; ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; - ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = client->sock; + ev.data.u64 = reinterpret_cast(client); if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { perror("epoll_ctl(EPOLL_CTL_MOD)");