From e0b47eba2f5ec1aca1d02adc9fb4ffc7293d5c0f Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 20 Apr 2013 15:56:57 +0200 Subject: [PATCH] Add Metacube headers in add_data_deferred(), not add_data(). This fixes a problem where Metacube blocks are not deterministically added for each add_data() call (because they would only be added for each call to process_queued_data()), causing the different servers to go out of sync. Also moved most of this logic to Stream, where it seems more appropriate (even though the mutexes are a bit odd now). --- server.cpp | 13 +++++-------- server.h | 6 +++--- stream.cpp | 50 +++++++++++++++++++++++++++++--------------------- stream.h | 17 ++++++++++++----- 4 files changed, 49 insertions(+), 37 deletions(-) diff --git a/server.cpp b/server.cpp index 6d5cce3..2eb33ed 100644 --- a/server.cpp +++ b/server.cpp @@ -250,7 +250,7 @@ void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool) void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) { MutexLock lock(&queued_data_mutex); - queued_data[stream_id].append(string(data, data + bytes)); + find_stream(stream_id)->add_data_deferred(data, bytes); } // See the .h file for postconditions after this function. @@ -571,12 +571,9 @@ void Server::process_queued_data() } queued_add_clients.clear(); - for (map::iterator queued_it = queued_data.begin(); - queued_it != queued_data.end(); - ++queued_it) { - Stream *stream = find_stream(queued_it->first); - stream->add_data(queued_it->second.data(), queued_it->second.size()); - stream->wake_up_all_clients(); + for (map::iterator stream_it = streams.begin(); + stream_it != streams.end(); + ++stream_it) { + stream_it->second->process_queued_data(); } - queued_data.clear(); } diff --git a/server.h b/server.h index a3e032d..edcdc83 100644 --- a/server.h +++ b/server.h @@ -60,8 +60,9 @@ public: void set_encoding(const std::string &stream_id, Stream::Encoding encoding); private: - // Mutex protecting queued_data only. Note that if you want to hold both this - // and below, you will need to take before this one. + // Mutex protecting queued_add_clients and streams[..]->queued_data. + // Note that if you want to hold both this and below, + // you will need to take before this one. mutable pthread_mutex_t queued_data_mutex; // Deferred commands that should be run from the do_work() thread as soon as possible. @@ -75,7 +76,6 @@ private: // // Protected by . std::vector queued_add_clients; - std::map queued_data; // All variables below this line are protected by the mutex. mutable pthread_mutex_t mutex; diff --git a/stream.cpp b/stream.cpp index 5ea2cd0..4be673c 100644 --- a/stream.cpp +++ b/stream.cpp @@ -119,26 +119,6 @@ void Stream::put_client_to_sleep(Client *client) sleeping_clients.push_back(client); } -void Stream::add_data(const char *data, ssize_t bytes) -{ - if (encoding == Stream::STREAM_ENCODING_RAW) { - add_data_raw(data, bytes); - } else if (encoding == STREAM_ENCODING_METACUBE) { - metacube_block_header hdr; - memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); - hdr.size = htonl(bytes); - hdr.flags = htonl(0); - - char *block = new char[bytes + sizeof(hdr)]; - memcpy(block, &hdr, sizeof(hdr)); - memcpy(block + sizeof(hdr), data, bytes); - add_data_raw(block, bytes + sizeof(hdr)); - delete[] block; - } else { - assert(false); - } -} - void Stream::add_data_raw(const char *data, ssize_t bytes) { size_t pos = bytes_received % backlog_size; @@ -180,8 +160,36 @@ void Stream::add_data_raw(const char *data, ssize_t bytes) } } -void Stream::wake_up_all_clients() +void Stream::add_data_deferred(const char *data, size_t bytes) +{ + if (encoding == Stream::STREAM_ENCODING_RAW) { + queued_data.append(string(data, data + bytes)); + } else if (encoding == STREAM_ENCODING_METACUBE) { + metacube_block_header hdr; + memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(bytes); + hdr.flags = htonl(0); + + char *block = new char[bytes + sizeof(hdr)]; + memcpy(block, &hdr, sizeof(hdr)); + memcpy(block + sizeof(hdr), data, bytes); + queued_data.append(string(block, block + bytes + sizeof(hdr))); + delete[] block; + } else { + assert(false); + } +} + +void Stream::process_queued_data() { + if (queued_data.empty()) { + return; + } + + add_data_raw(queued_data.data(), queued_data.size()); + queued_data.clear(); + + // We have more data, so wake up all clients. if (to_process.empty()) { swap(sleeping_clients, to_process); } else { diff --git a/stream.h b/stream.h index 7e20aef..97451f0 100644 --- a/stream.h +++ b/stream.h @@ -69,21 +69,28 @@ struct Stream { // What pool to fetch marks from, or NULL. MarkPool *mark_pool; + // Queued data, if any. Protected by the owning Server's . + std::string queued_data; + // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put // in the list of clients to wake up when we do. void put_client_to_sleep(Client *client); - // Add more input data to the stream. You should probably call wake_up_all_clients() - // after that. - void add_data(const char *data, ssize_t bytes); + // Add more data to , adding Metacube headers if needed. + // You should hold the owning Server's . + void add_data_deferred(const char *data, size_t bytes); - // We have more data, so mark all clients that are sleeping as ready to go. - void wake_up_all_clients(); + // Add queued data to the stream, if any. + // You should hold the owning Server's _and_ . + void process_queued_data(); private: Stream(const Stream& other); + // Adds data directly to the stream file descriptor, without adding headers or + // going through . You should hold the owning Server's + // . void add_data_raw(const char *data, ssize_t bytes); }; -- 2.39.2