]> git.sesse.net Git - cubemap/blobdiff - stream.cpp
Compute the checksum on outgoing Metacube packets as well.
[cubemap] / stream.cpp
index e8d65db1a8cfda924226ad1838d087ad01a176a9..42edee95aae01176e94c3a63ab47704a95b19a36 100644 (file)
@@ -9,7 +9,7 @@
 #include <vector>
 
 #include "log.h"
-#include "metacube.h"
+#include "metacube2.h"
 #include "state.pb.h"
 #include "stream.h"
 #include "util.h"
@@ -22,7 +22,9 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
          bytes_received(0),
-         mark_pool(NULL)
+         last_suitable_starting_point(-1),
+         mark_pool(NULL),
+         queued_data_last_starting_point(-1)
 {
        if (data_fd == -1) {
                exit(1);
@@ -44,7 +46,8 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
          data_fd(data_fd),
          backlog_size(serialized.backlog_size()),
          bytes_received(serialized.bytes_received()),
-         mark_pool(NULL)
+         mark_pool(NULL),
+         queued_data_last_starting_point(-1)
 {
        if (data_fd == -1) {
                exit(1);
@@ -62,6 +65,13 @@ Stream::Stream(const StreamProto &serialized, int data_fd)
                        stream_header = header.substr(split, string::npos);
                }
        }
+
+       // Older versions did not set last_suitable_starting_point.
+       if (serialized.has_last_suitable_starting_point()) {
+               last_suitable_starting_point = serialized.last_suitable_starting_point();
+       } else {
+               last_suitable_starting_point = bytes_received;
+       }
 }
 
 StreamProto Stream::serialize()
@@ -72,6 +82,7 @@ StreamProto Stream::serialize()
        serialized.add_data_fds(data_fd);
        serialized.set_backlog_size(backlog_size);
        serialized.set_bytes_received(bytes_received);
+       serialized.set_last_suitable_starting_point(last_suitable_starting_point);
        serialized.set_url(url);
        data_fd = -1;
        return serialized;
@@ -198,14 +209,24 @@ void Stream::add_data_raw(const vector<iovec> &orig_data)
        }
 }
 
-void Stream::add_data_deferred(const char *data, size_t bytes)
+void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
 {
+       assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
+              suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
+       if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
+               queued_data_last_starting_point = queued_data.size();
+       }
+
        if (encoding == Stream::STREAM_ENCODING_METACUBE) {
                // Add a Metacube block header before the data.
-               metacube_block_header hdr;
-               memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+               metacube2_block_header hdr;
+               memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
                hdr.size = htonl(bytes);
-               hdr.flags = htonl(0);
+               hdr.flags = htons(0);
+               if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
+                       hdr.flags |= htons(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
+               }
+               hdr.csum = htons(metacube2_compute_crc(&hdr));
 
                iovec iov;
                iov.iov_base = new char[bytes + sizeof(hdr)];
@@ -234,12 +255,23 @@ void Stream::process_queued_data()
                return;
        }
 
+       // Update the last suitable starting point for the stream,
+       // if the queued data contains such a starting point.
+       assert(queued_data_last_starting_point < ssize_t(queued_data.size()));
+       if (queued_data_last_starting_point >= 0) {
+               last_suitable_starting_point = bytes_received;
+               for (int i = 0; i < queued_data_last_starting_point; ++i) {
+                       last_suitable_starting_point += queued_data[i].iov_len;
+               }
+       }
+
        add_data_raw(queued_data);
        for (size_t i = 0; i < queued_data.size(); ++i) {
                char *data = reinterpret_cast<char *>(queued_data[i].iov_base);
                delete[] data;
        }
        queued_data.clear();
+       queued_data_last_starting_point = -1;
 
        // We have more data, so wake up all clients.
        if (to_process.empty()) {