X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=615e8fb31d48b137df3cdcb3acf8caa4cbf00c68;hp=2f7a3327079af265a0faf2469b9b8b29c2f9a308;hb=38a2bb28fd8dcb5bb1e0cb56028936a35f20f503;hpb=207ca0494024641f27537ad3cf047814d8092678 diff --git a/server.cpp b/server.cpp index 2f7a332..615e8fb 100644 --- a/server.cpp +++ b/server.cpp @@ -140,7 +140,7 @@ void Server::do_work() } timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC; while (!clients_ordered_by_connect_time.empty()) { - pair &connect_time_and_fd = clients_ordered_by_connect_time.front(); + const pair &connect_time_and_fd = clients_ordered_by_connect_time.front(); // See if we have reached the end of clients to process. if (is_earlier(timeout_time, connect_time_and_fd.first)) { @@ -149,7 +149,7 @@ void Server::do_work() // If this client doesn't exist anymore, just ignore it // (it was deleted earlier). - std::map::iterator client_it = clients.find(connect_time_and_fd.second); + map::iterator client_it = clients.find(connect_time_and_fd.second); if (client_it == clients.end()) { clients_ordered_by_connect_time.pop(); continue; @@ -286,7 +286,7 @@ void Server::add_client_from_serialized(const ClientProto &client) } } -int Server::lookup_stream_by_url(const std::string &url) const +int Server::lookup_stream_by_url(const string &url) const { map::const_iterator url_it = url_map.find(url); if (url_it == url_map.end()) { @@ -317,6 +317,13 @@ void Server::set_backlog_size(int stream_index, size_t new_size) assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->set_backlog_size(new_size); } + +void Server::set_prebuffering_bytes(int stream_index, size_t new_amount) +{ + MutexLock lock(&mutex); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->prebuffering_bytes = new_amount; +} void Server::set_encoding(int stream_index, Stream::Encoding encoding) { @@ -330,6 +337,17 @@ void Server::set_header(int stream_index, const string &http_header, const strin MutexLock lock(&mutex); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->http_header = http_header; + + if (stream_header != streams[stream_index]->stream_header) { + // We cannot start at any of the older starting points anymore, + // since they'd get the wrong header for the stream (not to mention + // that a changed header probably means the stream restarted, + // which means any client starting on the old one would probably + // stop playing properly at the change point). Next block + // should be a suitable starting point (if not, something is + // pretty strange), so it will fill up again soon enough. + streams[stream_index]->suitable_starting_points.clear(); + } streams[stream_index]->stream_header = stream_header; } @@ -405,17 +423,17 @@ read_request_again: } // We've changed states, so fall through. - assert(client->state == Client::SENDING_ERROR || + assert(client->state == Client::SENDING_SHORT_RESPONSE || client->state == Client::SENDING_HEADER); } - case Client::SENDING_ERROR: + case Client::SENDING_SHORT_RESPONSE: case Client::SENDING_HEADER: { -sending_header_or_error_again: +sending_header_or_short_response_again: int ret; do { ret = write(client->sock, - client->header_or_error.data() + client->header_or_error_bytes_sent, - client->header_or_error.size() - client->header_or_error_bytes_sent); + client->header_or_short_response.data() + client->header_or_short_response_bytes_sent, + client->header_or_short_response.size() - client->header_or_short_response_bytes_sent); } while (ret == -1 && errno == EINTR); if (ret == -1 && errno == EAGAIN) { @@ -433,61 +451,86 @@ sending_header_or_error_again: return; } - client->header_or_error_bytes_sent += ret; - assert(client->header_or_error_bytes_sent <= client->header_or_error.size()); + client->header_or_short_response_bytes_sent += ret; + assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response.size()); - if (client->header_or_error_bytes_sent < client->header_or_error.size()) { + if (client->header_or_short_response_bytes_sent < client->header_or_short_response.size()) { // We haven't sent all yet. Fine; go another round. - goto sending_header_or_error_again; + goto sending_header_or_short_response_again; } // We're done sending the header or error! Clear it to release some memory. - client->header_or_error.clear(); + client->header_or_short_response.clear(); - if (client->state == Client::SENDING_ERROR) { + if (client->state == Client::SENDING_SHORT_RESPONSE) { // We're done sending the error, so now close. // This is postcondition #1. close_client(client); return; } - // Start sending from the first keyframe we get. In other - // words, we won't send any of the backlog, but we'll start - // sending immediately as we get the next keyframe block. - // This is postcondition #3. + Stream *stream = client->stream; if (client->stream_pos == size_t(-2)) { - client->stream_pos = std::min( - client->stream->bytes_received - client->stream->backlog_size, + // Start sending from the beginning of the backlog. + client->stream_pos = min( + stream->bytes_received - stream->backlog_size, 0); client->state = Client::SENDING_DATA; - } else { - // client->stream_pos should be -1, but it might not be, - // if we have clients from an older version. - client->stream_pos = client->stream->bytes_received; + goto sending_data; + } else if (stream->prebuffering_bytes == 0) { + // Start sending from the first keyframe we get. In other + // words, we won't send any of the backlog, but we'll start + // sending immediately as we get the next keyframe block. + // Note that this is functionally identical to the next if branch, + // except that we save a binary search. + client->stream_pos = stream->bytes_received; client->state = Client::WAITING_FOR_KEYFRAME; + } else { + // We're not going to send anything to the client before we have + // N bytes. However, this wait might be boring; we can just as well + // use it to send older data if we have it. We use lower_bound() + // so that we are conservative and never add extra latency over just + // waiting (assuming CBR or nearly so); otherwise, we could want e.g. + // 100 kB prebuffer but end up sending a 10 MB GOP. + deque::const_iterator starting_point_it = + lower_bound(stream->suitable_starting_points.begin(), + stream->suitable_starting_points.end(), + stream->bytes_received - stream->prebuffering_bytes); + if (starting_point_it == stream->suitable_starting_points.end()) { + // None found. Just put us at the end, and then wait for the + // first keyframe to appear. + client->stream_pos = stream->bytes_received; + client->state = Client::WAITING_FOR_KEYFRAME; + } else { + client->stream_pos = *starting_point_it; + client->state = Client::PREBUFFERING; + goto prebuffering; + } } - client->stream->put_client_to_sleep(client); - return; + // Fall through. } case Client::WAITING_FOR_KEYFRAME: { Stream *stream = client->stream; - if (ssize_t(client->stream_pos) > stream->last_suitable_starting_point) { + if (stream->suitable_starting_points.empty() || + client->stream_pos > stream->suitable_starting_points.back()) { // We haven't received a keyframe since this stream started waiting, // so keep on waiting for one. // This is postcondition #3. stream->put_client_to_sleep(client); return; } - client->stream_pos = stream->last_suitable_starting_point; + client->stream_pos = stream->suitable_starting_points.back(); client->state = Client::PREBUFFERING; // Fall through. } case Client::PREBUFFERING: { +prebuffering: Stream *stream = client->stream; size_t bytes_to_send = stream->bytes_received - client->stream_pos; assert(bytes_to_send <= stream->backlog_size); if (bytes_to_send < stream->prebuffering_bytes) { // We don't have enough bytes buffered to start this client yet. + // This is postcondition #3. stream->put_client_to_sleep(client); return; } @@ -495,6 +538,7 @@ sending_header_or_error_again: // Fall through. } case Client::SENDING_DATA: { +sending_data: skip_lost_data(client); Stream *stream = client->stream; @@ -626,11 +670,11 @@ void Server::construct_header(Client *client) { Stream *stream = client->stream; if (stream->encoding == Stream::STREAM_ENCODING_RAW) { - client->header_or_error = stream->http_header + + client->header_or_short_response = stream->http_header + "\r\n" + stream->stream_header; } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { - client->header_or_error = stream->http_header + + client->header_or_short_response = stream->http_header + "Content-encoding: metacube\r\n" + "\r\n"; if (!stream->stream_header.empty()) { @@ -639,10 +683,10 @@ void Server::construct_header(Client *client) hdr.size = htonl(stream->stream_header.size()); hdr.flags = htons(METACUBE_FLAGS_HEADER); hdr.csum = htons(metacube2_compute_crc(&hdr)); - client->header_or_error.append( + client->header_or_short_response.append( string(reinterpret_cast(&hdr), sizeof(hdr))); } - client->header_or_error.append(stream->stream_header); + client->header_or_short_response.append(stream->stream_header); } else { assert(false); } @@ -665,10 +709,10 @@ void Server::construct_error(Client *client, int error_code) char error[256]; snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n", error_code); - client->header_or_error = error; + client->header_or_short_response = error; // Switch states. - client->state = Client::SENDING_ERROR; + client->state = Client::SENDING_SHORT_RESPONSE; epoll_event ev; ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;