From: Steinar H. Gunderson Date: Thu, 23 Jul 2015 10:22:35 +0000 (+0200) Subject: Track stream start suitability separately for each data block added. X-Git-Tag: 1.2.0~11 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=00cf4a1ffcb987ef6d27fcf49811fd5ef572a985 Track stream start suitability separately for each data block added. When having queued data, keep the separate stream start suitability flag for each iovec instead of just storing the index of the last block. This is a no-op in itself, but it is a prerequisite for tracking it in the backlog as well, which we want to do to be able to have force_prebuffer give out older data -- that requires tracking not only the last suitable starting point, but multiple ones backwards in time. Also add a TODO to update starting point(s) when the header changes. --- diff --git a/server.cpp b/server.cpp index 99a3925..7116a73 100644 --- a/server.cpp +++ b/server.cpp @@ -338,6 +338,7 @@ void Server::set_header(int stream_index, const string &http_header, const strin assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->http_header = http_header; streams[stream_index]->stream_header = stream_header; + // FIXME: We should reset last_suitable_starting_point at this point. } void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) diff --git a/stream.cpp b/stream.cpp index 7f4bce8..557502b 100644 --- a/stream.cpp +++ b/stream.cpp @@ -27,8 +27,7 @@ Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes prebuffering_bytes(prebuffering_bytes), bytes_received(0), last_suitable_starting_point(-1), - pacing_rate(~0U), - queued_data_last_starting_point_index(-1) + pacing_rate(~0U) { if (data_fd == -1) { exit(1); @@ -53,8 +52,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd) backlog_size(serialized.backlog_size()), prebuffering_bytes(serialized.prebuffering_bytes()), bytes_received(serialized.bytes_received()), - pacing_rate(~0U), - queued_data_last_starting_point_index(-1) + pacing_rate(~0U) { if (data_fd == -1) { exit(1); @@ -116,13 +114,36 @@ void Stream::set_backlog_size(size_t new_size) // Now cheat a bit by rewinding, and adding all the old data back. bytes_received -= existing_data.size(); - iovec iov; - iov.iov_base = const_cast(existing_data.data()); - iov.iov_len = existing_data.size(); - vector iovs; - iovs.push_back(iov); - add_data_raw(iovs); + size_t bytes_before_suitable_starting_point; + if (last_suitable_starting_point == -1) { + bytes_before_suitable_starting_point = existing_data.size(); + } else if (size_t(last_suitable_starting_point) < backlog_size) { + bytes_before_suitable_starting_point = 0; + } else { + bytes_before_suitable_starting_point = last_suitable_starting_point - backlog_size; + } + + vector data_elements; + if (bytes_before_suitable_starting_point > 0) { + // There's really no usable data here (except for ?backlog=1 users), + // but we need to get the accounting right anyway. + DataElement data_element; + data_element.data.iov_base = const_cast(existing_data.data()); + data_element.data.iov_len = bytes_before_suitable_starting_point; + data_element.suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START; + data_elements.push_back(data_element); + } + if (bytes_before_suitable_starting_point < existing_data.size()) { + DataElement data_element; + data_element.data.iov_base = const_cast(existing_data.data() + bytes_before_suitable_starting_point); + data_element.data.iov_len = existing_data.size() - bytes_before_suitable_starting_point; + data_element.suitable_for_stream_start = SUITABLE_FOR_STREAM_START; + data_elements.push_back(data_element); + } + + last_suitable_starting_point = -1; + add_data_raw(data_elements); } void Stream::put_client_to_sleep(Client *client) @@ -131,20 +152,20 @@ void Stream::put_client_to_sleep(Client *client) } // Return a new set of iovecs that contains only the first bytes of . -vector collect_iovecs(const vector &data, size_t bytes_wanted) +vector collect_iovecs(const vector &data, size_t bytes_wanted) { vector ret; size_t max_iovecs = std::min(data.size(), IOV_MAX); for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) { - if (data[i].iov_len <= bytes_wanted) { + if (data[i].data.iov_len <= bytes_wanted) { // Consume the entire iovec. - ret.push_back(data[i]); - bytes_wanted -= data[i].iov_len; + ret.push_back(data[i].data); + bytes_wanted -= data[i].data.iov_len; } else { // Take only parts of this iovec. iovec iov; - iov.iov_base = data[i].iov_base; - iov.iov_len = bytes_wanted; + iov.iov_base = data[i].data.iov_base; + iov.iov_len = bytes_wanted; ret.push_back(iov); bytes_wanted = 0; } @@ -153,20 +174,21 @@ vector collect_iovecs(const vector &data, size_t bytes_wanted) } // Return a new set of iovecs that contains all of except the first bytes. -vector remove_iovecs(const vector &data, size_t bytes_wanted) +vector remove_iovecs(const vector &data, size_t bytes_wanted) { - vector ret; + vector ret; size_t i; for (i = 0; i < data.size() && bytes_wanted > 0; ++i) { - if (data[i].iov_len <= bytes_wanted) { + if (data[i].data.iov_len <= bytes_wanted) { // Consume the entire iovec. - bytes_wanted -= data[i].iov_len; + bytes_wanted -= data[i].data.iov_len; } else { // Take only parts of this iovec. - iovec iov; - iov.iov_base = reinterpret_cast(data[i].iov_base) + bytes_wanted; - iov.iov_len = data[i].iov_len - bytes_wanted; - ret.push_back(iov); + Stream::DataElement data_element; + data_element.data.iov_base = reinterpret_cast(data[i].data.iov_base) + bytes_wanted; + data_element.data.iov_len = data[i].data.iov_len - bytes_wanted; + data_element.suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START; + ret.push_back(data_element); bytes_wanted = 0; } } @@ -176,9 +198,9 @@ vector remove_iovecs(const vector &data, size_t bytes_wanted) return ret; } -void Stream::add_data_raw(const vector &orig_data) +void Stream::add_data_raw(const vector &orig_data) { - vector data = orig_data; + vector data = orig_data; while (!data.empty()) { size_t pos = bytes_received % backlog_size; @@ -207,9 +229,9 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab MutexLock lock(&queued_data_mutex); assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START || suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START); - if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) { - queued_data_last_starting_point_index = queued_data.size(); - } + + DataElement data_element; + data_element.suitable_for_stream_start = suitable_for_stream_start; if (encoding == Stream::STREAM_ENCODING_METACUBE) { // Add a Metacube block header before the data. @@ -222,22 +244,20 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab } hdr.csum = htons(metacube2_compute_crc(&hdr)); - iovec iov; - iov.iov_base = new char[bytes + sizeof(hdr)]; - iov.iov_len = bytes + sizeof(hdr); + data_element.data.iov_base = new char[bytes + sizeof(hdr)]; + data_element.data.iov_len = bytes + sizeof(hdr); - memcpy(iov.iov_base, &hdr, sizeof(hdr)); - memcpy(reinterpret_cast(iov.iov_base) + sizeof(hdr), data, bytes); + memcpy(data_element.data.iov_base, &hdr, sizeof(hdr)); + memcpy(reinterpret_cast(data_element.data.iov_base) + sizeof(hdr), data, bytes); - queued_data.push_back(iov); + queued_data.push_back(data_element); } else if (encoding == Stream::STREAM_ENCODING_RAW) { // Just add the data itself. - iovec iov; - iov.iov_base = new char[bytes]; - memcpy(iov.iov_base, data, bytes); - iov.iov_len = bytes; + data_element.data.iov_base = new char[bytes]; + memcpy(data_element.data.iov_base, data, bytes); + data_element.data.iov_len = bytes; - queued_data.push_back(iov); + queued_data.push_back(data_element); } else { assert(false); } @@ -245,8 +265,7 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab void Stream::process_queued_data() { - std::vector queued_data_copy; - int queued_data_last_starting_point_index_copy = -1; + std::vector queued_data_copy; // Hold the lock for as short as possible, since add_data_raw() can possibly // write to disk, which might disturb the input thread. @@ -257,22 +276,21 @@ void Stream::process_queued_data() } swap(queued_data, queued_data_copy); - swap(queued_data_last_starting_point_index, queued_data_last_starting_point_index_copy); } // Update the last suitable starting point for the stream, // if the queued data contains such a starting point. - assert(queued_data_last_starting_point_index_copy < ssize_t(queued_data_copy.size())); - if (queued_data_last_starting_point_index_copy >= 0) { - last_suitable_starting_point = bytes_received; - for (int i = 0; i < queued_data_last_starting_point_index_copy; ++i) { - last_suitable_starting_point += queued_data_copy[i].iov_len; + size_t byte_position = bytes_received; + for (size_t i = 0; i < queued_data_copy.size(); ++i) { + if (queued_data_copy[i].suitable_for_stream_start == SUITABLE_FOR_STREAM_START) { + last_suitable_starting_point = byte_position; } + byte_position += queued_data_copy[i].data.iov_len; } add_data_raw(queued_data_copy); for (size_t i = 0; i < queued_data_copy.size(); ++i) { - char *data = reinterpret_cast(queued_data_copy[i].iov_base); + char *data = reinterpret_cast(queued_data_copy[i].data.iov_base); delete[] data; } diff --git a/stream.h b/stream.h index 3a9474a..9a9982a 100644 --- a/stream.h +++ b/stream.h @@ -93,11 +93,11 @@ struct Stream { // Queued data, if any. Protected by . // The data pointers in the iovec are owned by us. - std::vector queued_data; - - // Index of the last element in queued_data that is suitable to start streaming at. - // -1 if none. Protected by . - int queued_data_last_starting_point_index; + struct DataElement { + iovec data; + StreamStartSuitability suitable_for_stream_start; + }; + std::vector queued_data; // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put @@ -118,7 +118,7 @@ private: // Adds data directly to the stream file descriptor, without adding headers or // going through . // You should hold the owning Server's . - void add_data_raw(const std::vector &data); + void add_data_raw(const std::vector &data); }; #endif // !defined(_STREAM_H)