]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Rename header_or_error to header_or_short_response, as it will soon be able to contai...
[cubemap] / server.cpp
index 99a3925e13f0bca65531f82902012408e0b5556c..615e8fb31d48b137df3cdcb3acf8caa4cbf00c68 100644 (file)
@@ -140,7 +140,7 @@ void Server::do_work()
                }
                timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC;
                while (!clients_ordered_by_connect_time.empty()) {
-                       pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
+                       const pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
 
                        // See if we have reached the end of clients to process.
                        if (is_earlier(timeout_time, connect_time_and_fd.first)) {
@@ -149,7 +149,7 @@ void Server::do_work()
 
                        // If this client doesn't exist anymore, just ignore it
                        // (it was deleted earlier).
-                       std::map<int, Client>::iterator client_it = clients.find(connect_time_and_fd.second);
+                       map<int, Client>::iterator client_it = clients.find(connect_time_and_fd.second);
                        if (client_it == clients.end()) {
                                clients_ordered_by_connect_time.pop();
                                continue;
@@ -286,7 +286,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 }
 
-int Server::lookup_stream_by_url(const std::string &url) const
+int Server::lookup_stream_by_url(const string &url) const
 {
        map<string, int>::const_iterator url_it = url_map.find(url);
        if (url_it == url_map.end()) {
@@ -337,6 +337,17 @@ void Server::set_header(int stream_index, const string &http_header, const strin
        MutexLock lock(&mutex);
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
        streams[stream_index]->http_header = http_header;
+
+       if (stream_header != streams[stream_index]->stream_header) {
+               // We cannot start at any of the older starting points anymore,
+               // since they'd get the wrong header for the stream (not to mention
+               // that a changed header probably means the stream restarted,
+               // which means any client starting on the old one would probably
+               // stop playing properly at the change point). Next block
+               // should be a suitable starting point (if not, something is
+               // pretty strange), so it will fill up again soon enough.
+               streams[stream_index]->suitable_starting_points.clear();
+       }
        streams[stream_index]->stream_header = stream_header;
 }
        
@@ -412,17 +423,17 @@ read_request_again:
                }
 
                // We've changed states, so fall through.
-               assert(client->state == Client::SENDING_ERROR ||
+               assert(client->state == Client::SENDING_SHORT_RESPONSE ||
                       client->state == Client::SENDING_HEADER);
        }
-       case Client::SENDING_ERROR:
+       case Client::SENDING_SHORT_RESPONSE:
        case Client::SENDING_HEADER: {
-sending_header_or_error_again:
+sending_header_or_short_response_again:
                int ret;
                do {
                        ret = write(client->sock,
-                                   client->header_or_error.data() + client->header_or_error_bytes_sent,
-                                   client->header_or_error.size() - client->header_or_error_bytes_sent);
+                                   client->header_or_short_response.data() + client->header_or_short_response_bytes_sent,
+                                   client->header_or_short_response.size() - client->header_or_short_response_bytes_sent);
                } while (ret == -1 && errno == EINTR);
 
                if (ret == -1 && errno == EAGAIN) {
@@ -440,56 +451,80 @@ sending_header_or_error_again:
                        return;
                }
                
-               client->header_or_error_bytes_sent += ret;
-               assert(client->header_or_error_bytes_sent <= client->header_or_error.size());
+               client->header_or_short_response_bytes_sent += ret;
+               assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response.size());
 
-               if (client->header_or_error_bytes_sent < client->header_or_error.size()) {
+               if (client->header_or_short_response_bytes_sent < client->header_or_short_response.size()) {
                        // We haven't sent all yet. Fine; go another round.
-                       goto sending_header_or_error_again;
+                       goto sending_header_or_short_response_again;
                }
 
                // We're done sending the header or error! Clear it to release some memory.
-               client->header_or_error.clear();
+               client->header_or_short_response.clear();
 
-               if (client->state == Client::SENDING_ERROR) {
+               if (client->state == Client::SENDING_SHORT_RESPONSE) {
                        // We're done sending the error, so now close.  
                        // This is postcondition #1.
                        close_client(client);
                        return;
                }
 
-               // Start sending from the first keyframe we get. In other
-               // words, we won't send any of the backlog, but we'll start
-               // sending immediately as we get the next keyframe block.
-               // This is postcondition #3.
+               Stream *stream = client->stream;
                if (client->stream_pos == size_t(-2)) {
-                       client->stream_pos = std::min<size_t>(
-                           client->stream->bytes_received - client->stream->backlog_size,
+                       // Start sending from the beginning of the backlog.
+                       client->stream_pos = min<size_t>(
+                           stream->bytes_received - stream->backlog_size,
                            0);
                        client->state = Client::SENDING_DATA;
-               } else {
-                       // client->stream_pos should be -1, but it might not be,
-                       // if we have clients from an older version.
-                       client->stream_pos = client->stream->bytes_received;
+                       goto sending_data;
+               } else if (stream->prebuffering_bytes == 0) {
+                       // Start sending from the first keyframe we get. In other
+                       // words, we won't send any of the backlog, but we'll start
+                       // sending immediately as we get the next keyframe block.
+                       // Note that this is functionally identical to the next if branch,
+                       // except that we save a binary search.
+                       client->stream_pos = stream->bytes_received;
                        client->state = Client::WAITING_FOR_KEYFRAME;
+               } else {
+                       // We're not going to send anything to the client before we have
+                       // N bytes. However, this wait might be boring; we can just as well
+                       // use it to send older data if we have it. We use lower_bound()
+                       // so that we are conservative and never add extra latency over just
+                       // waiting (assuming CBR or nearly so); otherwise, we could want e.g.
+                       // 100 kB prebuffer but end up sending a 10 MB GOP.
+                       deque<size_t>::const_iterator starting_point_it =
+                               lower_bound(stream->suitable_starting_points.begin(),
+                                           stream->suitable_starting_points.end(),
+                                           stream->bytes_received - stream->prebuffering_bytes);
+                       if (starting_point_it == stream->suitable_starting_points.end()) {
+                               // None found. Just put us at the end, and then wait for the
+                               // first keyframe to appear.
+                               client->stream_pos = stream->bytes_received;
+                               client->state = Client::WAITING_FOR_KEYFRAME;
+                       } else {
+                               client->stream_pos = *starting_point_it;
+                               client->state = Client::PREBUFFERING;
+                               goto prebuffering;
+                       }
                }
-               client->stream->put_client_to_sleep(client);
-               return;
+               // Fall through.
        }
        case Client::WAITING_FOR_KEYFRAME: {
                Stream *stream = client->stream;
-               if (ssize_t(client->stream_pos) > stream->last_suitable_starting_point) {
+               if (stream->suitable_starting_points.empty() ||
+                   client->stream_pos > stream->suitable_starting_points.back()) {
                        // We haven't received a keyframe since this stream started waiting,
                        // so keep on waiting for one.
                        // This is postcondition #3.
                        stream->put_client_to_sleep(client);
                        return;
                }
-               client->stream_pos = stream->last_suitable_starting_point;
+               client->stream_pos = stream->suitable_starting_points.back();
                client->state = Client::PREBUFFERING;
                // Fall through.
        }
        case Client::PREBUFFERING: {
+prebuffering:
                Stream *stream = client->stream;
                size_t bytes_to_send = stream->bytes_received - client->stream_pos;
                assert(bytes_to_send <= stream->backlog_size);
@@ -503,6 +538,7 @@ sending_header_or_error_again:
                // Fall through.
        }
        case Client::SENDING_DATA: {
+sending_data:
                skip_lost_data(client);
                Stream *stream = client->stream;
 
@@ -634,11 +670,11 @@ void Server::construct_header(Client *client)
 {
        Stream *stream = client->stream;
        if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
-               client->header_or_error = stream->http_header +
+               client->header_or_short_response = stream->http_header +
                        "\r\n" +
                        stream->stream_header;
        } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
-               client->header_or_error = stream->http_header +
+               client->header_or_short_response = stream->http_header +
                        "Content-encoding: metacube\r\n" +
                        "\r\n";
                if (!stream->stream_header.empty()) {
@@ -647,10 +683,10 @@ void Server::construct_header(Client *client)
                        hdr.size = htonl(stream->stream_header.size());
                        hdr.flags = htons(METACUBE_FLAGS_HEADER);
                        hdr.csum = htons(metacube2_compute_crc(&hdr));
-                       client->header_or_error.append(
+                       client->header_or_short_response.append(
                                string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
                }
-               client->header_or_error.append(stream->stream_header);
+               client->header_or_short_response.append(stream->stream_header);
        } else {
                assert(false);
        }
@@ -673,10 +709,10 @@ void Server::construct_error(Client *client, int error_code)
        char error[256];
        snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n",
                error_code);
-       client->header_or_error = error;
+       client->header_or_short_response = error;
 
        // Switch states.
-       client->state = Client::SENDING_ERROR;
+       client->state = Client::SENDING_SHORT_RESPONSE;
 
        epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;