+ bytes_received += ret;
+
+ // Remove the data that was actually written from the set of iovecs.
+ data = remove_iovecs(data, ret);
+ }
+}
+
+void Stream::remove_obsolete_starting_points()
+{
+ // We could do a binary search here (std::lower_bound), but it seems
+ // overkill for removing what's probably only a few points.
+ while (!suitable_starting_points.empty() &&
+ bytes_received - suitable_starting_points[0] > backlog_size) {
+ suitable_starting_points.pop_front();
+ }
+}
+
+void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
+{
+ MutexLock lock(&queued_data_mutex);
+ assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
+ suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
+
+ DataElement data_element;
+ data_element.suitable_for_stream_start = suitable_for_stream_start;
+
+ if (encoding == Stream::STREAM_ENCODING_METACUBE) {
+ // Add a Metacube block header before the data.
+ metacube2_block_header hdr;
+ memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(bytes);
+ hdr.flags = htons(0);
+ if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
+ hdr.flags |= htons(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
+ }
+ hdr.csum = htons(metacube2_compute_crc(&hdr));
+
+ data_element.data.iov_base = new char[bytes + sizeof(hdr)];
+ data_element.data.iov_len = bytes + sizeof(hdr);
+
+ memcpy(data_element.data.iov_base, &hdr, sizeof(hdr));
+ memcpy(reinterpret_cast<char *>(data_element.data.iov_base) + sizeof(hdr), data, bytes);
+
+ queued_data.push_back(data_element);
+ } else if (encoding == Stream::STREAM_ENCODING_RAW) {
+ // Just add the data itself.
+ data_element.data.iov_base = new char[bytes];
+ memcpy(data_element.data.iov_base, data, bytes);
+ data_element.data.iov_len = bytes;
+
+ queued_data.push_back(data_element);
+ } else {
+ assert(false);