Add Metacube headers in add_data_deferred(), not add_data().
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 20 Apr 2013 13:56:57 +0000 (15:56 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 20 Apr 2013 13:56:57 +0000 (15:56 +0200)
This fixes a problem where Metacube blocks are not deterministically added
for each add_data() call (because they would only be added for each call to
process_queued_data()), causing the different servers to go out of sync.

Also moved most of this logic to Stream, where it seems more appropriate
(even though the mutexes are a bit odd now).

server.cpp
server.h
stream.cpp
stream.h

index 6d5cce3..2eb33ed 100644 (file)
@@ -250,7 +250,7 @@ void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
 void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
 {
        MutexLock lock(&queued_data_mutex);
-       queued_data[stream_id].append(string(data, data + bytes));
+       find_stream(stream_id)->add_data_deferred(data, bytes);
 }
 
 // See the .h file for postconditions after this function.     
@@ -571,12 +571,9 @@ void Server::process_queued_data()
        }
        queued_add_clients.clear();     
        
-       for (map<string, string>::iterator queued_it = queued_data.begin();
-            queued_it != queued_data.end();
-            ++queued_it) {
-               Stream *stream = find_stream(queued_it->first);
-               stream->add_data(queued_it->second.data(), queued_it->second.size());
-               stream->wake_up_all_clients();
+       for (map<string, Stream *>::iterator stream_it = streams.begin();
+            stream_it != streams.end();
+            ++stream_it) {
+               stream_it->second->process_queued_data();
        }
-       queued_data.clear();
 }
index a3e032d..edcdc83 100644 (file)
--- a/server.h
+++ b/server.h
@@ -60,8 +60,9 @@ public:
        void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
 
 private:
-       // Mutex protecting queued_data only. Note that if you want to hold both this
-       // and <mutex> below, you will need to take <mutex> before this one.
+       // Mutex protecting queued_add_clients and streams[..]->queued_data.
+       // Note that if you want to hold both this and <mutex> below,
+       // you will need to take <mutex> before this one.
        mutable pthread_mutex_t queued_data_mutex;
 
        // Deferred commands that should be run from the do_work() thread as soon as possible.
@@ -75,7 +76,6 @@ private:
        //      
        // Protected by <queued_data_mutex>.
        std::vector<int> queued_add_clients;
-       std::map<std::string, std::string> queued_data;
 
        // All variables below this line are protected by the mutex.
        mutable pthread_mutex_t mutex;
index 5ea2cd0..4be673c 100644 (file)
@@ -119,26 +119,6 @@ void Stream::put_client_to_sleep(Client *client)
        sleeping_clients.push_back(client);
 }
 
-void Stream::add_data(const char *data, ssize_t bytes)
-{
-       if (encoding == Stream::STREAM_ENCODING_RAW) {
-               add_data_raw(data, bytes);
-       } else if (encoding == STREAM_ENCODING_METACUBE) {
-               metacube_block_header hdr;
-               memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
-               hdr.size = htonl(bytes);
-               hdr.flags = htonl(0);
-
-               char *block = new char[bytes + sizeof(hdr)];
-               memcpy(block, &hdr, sizeof(hdr));
-               memcpy(block + sizeof(hdr), data, bytes);
-               add_data_raw(block, bytes + sizeof(hdr));
-               delete[] block;
-       } else {
-               assert(false);
-       }
-}
-
 void Stream::add_data_raw(const char *data, ssize_t bytes)
 {
        size_t pos = bytes_received % backlog_size;
@@ -180,8 +160,36 @@ void Stream::add_data_raw(const char *data, ssize_t bytes)
        }
 }
 
-void Stream::wake_up_all_clients()
+void Stream::add_data_deferred(const char *data, size_t bytes)
+{
+       if (encoding == Stream::STREAM_ENCODING_RAW) {
+               queued_data.append(string(data, data + bytes));
+       } else if (encoding == STREAM_ENCODING_METACUBE) {
+               metacube_block_header hdr;
+               memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+               hdr.size = htonl(bytes);
+               hdr.flags = htonl(0);
+
+               char *block = new char[bytes + sizeof(hdr)];
+               memcpy(block, &hdr, sizeof(hdr));
+               memcpy(block + sizeof(hdr), data, bytes);
+               queued_data.append(string(block, block + bytes + sizeof(hdr)));
+               delete[] block;
+       } else {
+               assert(false);
+       }
+}
+
+void Stream::process_queued_data()
 {
+       if (queued_data.empty()) {
+               return;
+       }
+
+       add_data_raw(queued_data.data(), queued_data.size());
+       queued_data.clear();
+
+       // We have more data, so wake up all clients.
        if (to_process.empty()) {
                swap(sleeping_clients, to_process);
        } else {
index 7e20aef..97451f0 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -69,21 +69,28 @@ struct Stream {
        // What pool to fetch marks from, or NULL.
        MarkPool *mark_pool;
 
+       // Queued data, if any. Protected by the owning Server's <queued_data_mutex>.
+       std::string queued_data;
+
        // Put client to sleep, since there is no more data for it; we will on
        // longer listen on POLLOUT until we get more data. Also, it will be put
        // in the list of clients to wake up when we do.
        void put_client_to_sleep(Client *client);
 
-       // Add more input data to the stream. You should probably call wake_up_all_clients()
-       // after that.
-       void add_data(const char *data, ssize_t bytes);
+       // Add more data to <queued_data>, adding Metacube headers if needed.
+       // You should hold the owning Server's <queued_data_mutex>.
+       void add_data_deferred(const char *data, size_t bytes);
 
-       // We have more data, so mark all clients that are sleeping as ready to go.
-       void wake_up_all_clients();
+       // Add queued data to the stream, if any.
+       // You should hold the owning Server's <mutex> _and_ <queued_data_mutex>.
+       void process_queued_data();
 
 private:
        Stream(const Stream& other);
 
+       // Adds data directly to the stream file descriptor, without adding headers or
+       // going through <queued_data>. You should hold the owning Server's
+       // <mutex>.
        void add_data_raw(const char *data, ssize_t bytes);
 };