X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=b5f58d4c56711968db64a8d6c63e7d9eb08d0e25;hp=e4ea647c64d05555505f3f18358f4e9b642d5370;hb=71fc5575037bead8b6e927a1fffd199e4fc4514b;hpb=8f44468bfe4a1d1607b0ab7044c3071605ae1fa7 diff --git a/server.cpp b/server.cpp index e4ea647..b5f58d4 100644 --- a/server.cpp +++ b/server.cpp @@ -1,9 +1,11 @@ -#include #include #include +#include #include +#include #include #include +#include #include #include #include @@ -145,18 +147,21 @@ void Server::add_client_deferred(int sock) void Server::add_client(int sock) { - clients.insert(make_pair(sock, Client(sock))); + pair::iterator, bool> ret = + clients.insert(make_pair(sock, Client(sock))); + assert(ret.second == true); // Should not already exist. + Client *client_ptr = &ret.first->second; // Start listening on data from this socket. epoll_event ev; ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - ev.data.u64 = reinterpret_cast(&clients[sock]); + ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } - process_client(&clients[sock]); + process_client(client_ptr); } void Server::add_client_from_serialized(const ClientProto &client) @@ -169,8 +174,10 @@ void Server::add_client_from_serialized(const ClientProto &client) } else { stream = stream_it->second; } - clients.insert(make_pair(client.sock(), Client(client, stream))); - Client *client_ptr = &clients[client.sock()]; + pair::iterator, bool> ret = + clients.insert(make_pair(client.sock(), Client(client, stream))); + assert(ret.second == true); // Should not already exist. + Client *client_ptr = &ret.first->second; // Start listening on data from this socket. epoll_event ev; @@ -202,10 +209,10 @@ void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::En streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); } -void Server::add_stream_from_serialized(const StreamProto &stream) +void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { MutexLock lock(&mutex); - streams.insert(make_pair(stream.stream_id(), new Stream(stream))); + streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd))); } void Server::set_backlog_size(const string &stream_id, size_t new_size) @@ -376,13 +383,17 @@ sending_data_again: return; } if (bytes_to_send > stream->backlog_size) { - log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection", - client->remote_addr.c_str(), - (long long int)(bytes_to_send - stream->backlog_size)); + size_t bytes_lost = bytes_to_send - stream->backlog_size; client->stream_pos = stream->bytes_received - stream->backlog_size; - client->bytes_lost += bytes_to_send - stream->backlog_size; + client->bytes_lost += bytes_lost; ++client->num_loss_events; bytes_to_send = stream->backlog_size; + + double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent); + log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection", + client->remote_addr.c_str(), + (long long int)(bytes_lost), + 100.0 * loss_fraction); } // See if we need to split across the circular buffer. @@ -470,16 +481,18 @@ void Server::construct_header(Client *client) "\r\n" + stream->stream_header; } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { - metacube_block_header hdr; - memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); - hdr.size = htonl(stream->stream_header.size()); - hdr.flags = htonl(METACUBE_FLAGS_HEADER); - client->header_or_error = stream->http_header + "Content-encoding: metacube\r\n" + - "\r\n" + - string(reinterpret_cast(&hdr), sizeof(hdr)) + - stream->stream_header; + "\r\n"; + if (!stream->stream_header.empty()) { + metacube_block_header hdr; + memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(stream->stream_header.size()); + hdr.flags = htonl(METACUBE_FLAGS_HEADER); + client->header_or_error.append( + string(reinterpret_cast(&hdr), sizeof(hdr))); + } + client->header_or_error.append(stream->stream_header); } else { assert(false); }