Track stream start suitability separately for each data block added.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 23 Jul 2015 10:22:35 +0000 (12:22 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 23 Jul 2015 10:26:32 +0000 (12:26 +0200)
When having queued data, keep the separate stream start suitability flag
for each iovec instead of just storing the index of the last block.
This is a no-op in itself, but it is a prerequisite for tracking it in
the backlog as well, which we want to do to be able to have force_prebuffer
give out older data -- that requires tracking not only the last suitable
starting point, but multiple ones backwards in time.

Also add a TODO to update starting point(s) when the header changes.

server.cpp
stream.cpp
stream.h

index 99a3925..7116a73 100644 (file)
@@ -338,6 +338,7 @@ void Server::set_header(int stream_index, const string &http_header, const strin
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
        streams[stream_index]->http_header = http_header;
        streams[stream_index]->stream_header = stream_header;
+       // FIXME: We should reset last_suitable_starting_point at this point.
 }
        
 void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
index 7f4bce8..557502b 100644 (file)
@@ -27,8 +27,7 @@ Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes
          prebuffering_bytes(prebuffering_bytes),
          bytes_received(0),
          last_suitable_starting_point(-1),
-         pacing_rate(~0U),
-         queued_data_last_starting_point_index(-1)
+         pacing_rate(~0U)
 {
        if (data_fd == -1) {
                exit(1);
@@ -53,8 +52,7 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
          backlog_size(serialized.backlog_size()),
          prebuffering_bytes(serialized.prebuffering_bytes()),
          bytes_received(serialized.bytes_received()),
-         pacing_rate(~0U),
-         queued_data_last_starting_point_index(-1)
+         pacing_rate(~0U)
 {
        if (data_fd == -1) {
                exit(1);
@@ -116,13 +114,36 @@ void Stream::set_backlog_size(size_t new_size)
 
        // Now cheat a bit by rewinding, and adding all the old data back.
        bytes_received -= existing_data.size();
-       iovec iov;
-       iov.iov_base = const_cast<char *>(existing_data.data());
-       iov.iov_len = existing_data.size();
 
-       vector<iovec> iovs;
-       iovs.push_back(iov);
-       add_data_raw(iovs);
+       size_t bytes_before_suitable_starting_point;
+       if (last_suitable_starting_point == -1) {
+               bytes_before_suitable_starting_point = existing_data.size();
+       } else if (size_t(last_suitable_starting_point) < backlog_size) {
+               bytes_before_suitable_starting_point = 0;
+       } else {
+               bytes_before_suitable_starting_point = last_suitable_starting_point - backlog_size;
+       }
+
+       vector<DataElement> data_elements;
+       if (bytes_before_suitable_starting_point > 0) {
+               // There's really no usable data here (except for ?backlog=1 users),
+               // but we need to get the accounting right anyway.
+               DataElement data_element;
+               data_element.data.iov_base = const_cast<char *>(existing_data.data());
+               data_element.data.iov_len = bytes_before_suitable_starting_point;
+               data_element.suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
+               data_elements.push_back(data_element);
+       }
+       if (bytes_before_suitable_starting_point < existing_data.size()) {
+               DataElement data_element;
+               data_element.data.iov_base = const_cast<char *>(existing_data.data() + bytes_before_suitable_starting_point);
+               data_element.data.iov_len = existing_data.size() - bytes_before_suitable_starting_point;
+               data_element.suitable_for_stream_start = SUITABLE_FOR_STREAM_START;
+               data_elements.push_back(data_element);
+       }
+
+       last_suitable_starting_point = -1;
+       add_data_raw(data_elements);
 }
 
 void Stream::put_client_to_sleep(Client *client)
@@ -131,20 +152,20 @@ void Stream::put_client_to_sleep(Client *client)
 }
 
 // Return a new set of iovecs that contains only the first <bytes_wanted> bytes of <data>.
-vector<iovec> collect_iovecs(const vector<iovec> &data, size_t bytes_wanted)
+vector<iovec> collect_iovecs(const vector<Stream::DataElement> &data, size_t bytes_wanted)
 {
        vector<iovec> ret;
        size_t max_iovecs = std::min<size_t>(data.size(), IOV_MAX);
        for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
-               if (data[i].iov_len <= bytes_wanted) {
+               if (data[i].data.iov_len <= bytes_wanted) {
                        // Consume the entire iovec.
-                       ret.push_back(data[i]);
-                       bytes_wanted -= data[i].iov_len;
+                       ret.push_back(data[i].data);
+                       bytes_wanted -= data[i].data.iov_len;
                } else {
                        // Take only parts of this iovec.
                        iovec iov;
-                       iov.iov_base = data[i].iov_base;
-                       iov.iov_len = bytes_wanted;     
+                       iov.iov_base = data[i].data.iov_base;
+                       iov.iov_len = bytes_wanted;
                        ret.push_back(iov);
                        bytes_wanted = 0;
                }
@@ -153,20 +174,21 @@ vector<iovec> collect_iovecs(const vector<iovec> &data, size_t bytes_wanted)
 }
 
 // Return a new set of iovecs that contains all of <data> except the first <bytes_wanted> bytes.
-vector<iovec> remove_iovecs(const vector<iovec> &data, size_t bytes_wanted)
+vector<Stream::DataElement> remove_iovecs(const vector<Stream::DataElement> &data, size_t bytes_wanted)
 {
-       vector<iovec> ret;
+       vector<Stream::DataElement> ret;
        size_t i;
        for (i = 0; i < data.size() && bytes_wanted > 0; ++i) {
-               if (data[i].iov_len <= bytes_wanted) {
+               if (data[i].data.iov_len <= bytes_wanted) {
                        // Consume the entire iovec.
-                       bytes_wanted -= data[i].iov_len;
+                       bytes_wanted -= data[i].data.iov_len;
                } else {
                        // Take only parts of this iovec.
-                       iovec iov;
-                       iov.iov_base = reinterpret_cast<char *>(data[i].iov_base) + bytes_wanted;
-                       iov.iov_len = data[i].iov_len - bytes_wanted;
-                       ret.push_back(iov);
+                       Stream::DataElement data_element;
+                       data_element.data.iov_base = reinterpret_cast<char *>(data[i].data.iov_base) + bytes_wanted;
+                       data_element.data.iov_len = data[i].data.iov_len - bytes_wanted;
+                       data_element.suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
+                       ret.push_back(data_element);
                        bytes_wanted = 0;
                }
        }
@@ -176,9 +198,9 @@ vector<iovec> remove_iovecs(const vector<iovec> &data, size_t bytes_wanted)
        return ret;
 }
 
-void Stream::add_data_raw(const vector<iovec> &orig_data)
+void Stream::add_data_raw(const vector<DataElement> &orig_data)
 {
-       vector<iovec> data = orig_data;
+       vector<DataElement> data = orig_data;
        while (!data.empty()) {
                size_t pos = bytes_received % backlog_size;
 
@@ -207,9 +229,9 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab
        MutexLock lock(&queued_data_mutex);
        assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
               suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
-       if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
-               queued_data_last_starting_point_index = queued_data.size();
-       }
+
+       DataElement data_element;
+       data_element.suitable_for_stream_start = suitable_for_stream_start;
 
        if (encoding == Stream::STREAM_ENCODING_METACUBE) {
                // Add a Metacube block header before the data.
@@ -222,22 +244,20 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab
                }
                hdr.csum = htons(metacube2_compute_crc(&hdr));
 
-               iovec iov;
-               iov.iov_base = new char[bytes + sizeof(hdr)];
-               iov.iov_len = bytes + sizeof(hdr);
+               data_element.data.iov_base = new char[bytes + sizeof(hdr)];
+               data_element.data.iov_len = bytes + sizeof(hdr);
 
-               memcpy(iov.iov_base, &hdr, sizeof(hdr));
-               memcpy(reinterpret_cast<char *>(iov.iov_base) + sizeof(hdr), data, bytes);
+               memcpy(data_element.data.iov_base, &hdr, sizeof(hdr));
+               memcpy(reinterpret_cast<char *>(data_element.data.iov_base) + sizeof(hdr), data, bytes);
 
-               queued_data.push_back(iov);
+               queued_data.push_back(data_element);
        } else if (encoding == Stream::STREAM_ENCODING_RAW) {
                // Just add the data itself.
-               iovec iov;
-               iov.iov_base = new char[bytes];
-               memcpy(iov.iov_base, data, bytes);
-               iov.iov_len = bytes;
+               data_element.data.iov_base = new char[bytes];
+               memcpy(data_element.data.iov_base, data, bytes);
+               data_element.data.iov_len = bytes;
 
-               queued_data.push_back(iov);
+               queued_data.push_back(data_element);
        } else {
                assert(false);
        }
@@ -245,8 +265,7 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab
 
 void Stream::process_queued_data()
 {
-       std::vector<iovec> queued_data_copy;
-       int queued_data_last_starting_point_index_copy = -1;
+       std::vector<DataElement> queued_data_copy;
 
        // Hold the lock for as short as possible, since add_data_raw() can possibly
        // write to disk, which might disturb the input thread.
@@ -257,22 +276,21 @@ void Stream::process_queued_data()
                }
 
                swap(queued_data, queued_data_copy);
-               swap(queued_data_last_starting_point_index, queued_data_last_starting_point_index_copy);
        }
 
        // Update the last suitable starting point for the stream,
        // if the queued data contains such a starting point.
-       assert(queued_data_last_starting_point_index_copy < ssize_t(queued_data_copy.size()));
-       if (queued_data_last_starting_point_index_copy >= 0) {
-               last_suitable_starting_point = bytes_received;
-               for (int i = 0; i < queued_data_last_starting_point_index_copy; ++i) {
-                       last_suitable_starting_point += queued_data_copy[i].iov_len;
+       size_t byte_position = bytes_received;
+       for (size_t i = 0; i < queued_data_copy.size(); ++i) {
+               if (queued_data_copy[i].suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
+                       last_suitable_starting_point = byte_position;
                }
+               byte_position += queued_data_copy[i].data.iov_len;
        }
 
        add_data_raw(queued_data_copy);
        for (size_t i = 0; i < queued_data_copy.size(); ++i) {
-               char *data = reinterpret_cast<char *>(queued_data_copy[i].iov_base);
+               char *data = reinterpret_cast<char *>(queued_data_copy[i].data.iov_base);
                delete[] data;
        }
 
index 3a9474a..9a9982a 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -93,11 +93,11 @@ struct Stream {
 
        // Queued data, if any. Protected by <queued_data_mutex>.
        // The data pointers in the iovec are owned by us.
-       std::vector<iovec> queued_data;
-
-       // Index of the last element in queued_data that is suitable to start streaming at.
-       // -1 if none. Protected by <queued_data_mutex>.
-       int queued_data_last_starting_point_index;
+       struct DataElement {
+               iovec data;
+               StreamStartSuitability suitable_for_stream_start;
+       };
+       std::vector<DataElement> queued_data;
 
        // Put client to sleep, since there is no more data for it; we will on
        // longer listen on POLLOUT until we get more data. Also, it will be put
@@ -118,7 +118,7 @@ private:
        // Adds data directly to the stream file descriptor, without adding headers or
        // going through <queued_data>.
        // You should hold the owning Server's <mutex>.
-       void add_data_raw(const std::vector<iovec> &data);
+       void add_data_raw(const std::vector<DataElement> &data);
 };
 
 #endif  // !defined(_STREAM_H)