From 9b8129d05b5d1ec0caed09a40f170e967afc60b3 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sun, 8 Sep 2013 18:18:14 +0200 Subject: [PATCH] Reduce contention of queued_data_mutex. Seemingly holding queued_data_mutex over add_data_raw(), which does writev(), could be slow on systems where /tmp is not on tmpfs, causing the queued_data_mutex to be held for so long (up to a second has been observed) that the input thread couldn't keep up. To fix this, we move queued_data_mutex into a per-stream variable (not sure if it's ideal, but it was the simplest way to avoid ugliness), and then hold it for as short as possible in process_queued_data(). While we're at it, document that queued_data_last_starting_point has the same locking rules as queued_data. --- server.cpp | 15 ++++++++------- server.h | 6 +++--- stream.cpp | 37 ++++++++++++++++++++++++++----------- stream.h | 12 ++++++++---- 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/server.cpp b/server.cpp index 8790633..a5f6a32 100644 --- a/server.cpp +++ b/server.cpp @@ -35,7 +35,7 @@ extern AccessLogThread *access_log; Server::Server() { pthread_mutex_init(&mutex, NULL); - pthread_mutex_init(&queued_data_mutex, NULL); + pthread_mutex_init(&queued_clients_mutex, NULL); epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { @@ -140,7 +140,7 @@ CubemapStateProto Server::serialize() void Server::add_client_deferred(int sock) { - MutexLock lock(&queued_data_mutex); + MutexLock lock(&queued_clients_mutex); queued_add_clients.push_back(sock); } @@ -273,7 +273,6 @@ void Server::set_mark_pool(int stream_index, MarkPool *mark_pool) void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { - MutexLock lock(&queued_data_mutex); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->add_data_deferred(data, bytes, suitable_for_stream_start); } @@ -624,12 +623,14 @@ void Server::close_client(Client *client) void Server::process_queued_data() { - MutexLock lock(&queued_data_mutex); + { + MutexLock lock(&queued_clients_mutex); - for (size_t i = 0; i < queued_add_clients.size(); ++i) { - add_client(queued_add_clients[i]); + for (size_t i = 0; i < queued_add_clients.size(); ++i) { + add_client(queued_add_clients[i]); + } + queued_add_clients.clear(); } - queued_add_clients.clear(); for (size_t i = 0; i < streams.size(); ++i) { streams[i]->process_queued_data(); diff --git a/server.h b/server.h index f0eb622..4b3aab8 100644 --- a/server.h +++ b/server.h @@ -61,10 +61,10 @@ public: void set_encoding(int stream_index, Stream::Encoding encoding); private: - // Mutex protecting queued_add_clients and streams[..]->queued_data. + // Mutex protecting queued_add_clients. // Note that if you want to hold both this and below, // you will need to take before this one. - mutable pthread_mutex_t queued_data_mutex; + mutable pthread_mutex_t queued_clients_mutex; // Deferred commands that should be run from the do_work() thread as soon as possible. // We defer these for two reasons: @@ -75,7 +75,7 @@ private: // add_data(), since they want to do add_data() rather often, and // can be taken a lot of the time. // - // Protected by . + // Protected by . std::vector queued_add_clients; // All variables below this line are protected by the mutex. diff --git a/stream.cpp b/stream.cpp index 4840b06..b7a92c2 100644 --- a/stream.cpp +++ b/stream.cpp @@ -11,6 +11,7 @@ #include "log.h" #include "metacube2.h" +#include "mutexlock.h" #include "state.pb.h" #include "stream.h" #include "util.h" @@ -30,6 +31,8 @@ Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) if (data_fd == -1) { exit(1); } + + pthread_mutex_init(&queued_data_mutex, NULL); } Stream::~Stream() @@ -73,6 +76,8 @@ Stream::Stream(const StreamProto &serialized, int data_fd) } else { last_suitable_starting_point = bytes_received; } + + pthread_mutex_init(&queued_data_mutex, NULL); } StreamProto Stream::serialize() @@ -212,6 +217,7 @@ void Stream::add_data_raw(const vector &orig_data) void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { + MutexLock lock(&queued_data_mutex); assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START || suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START); if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) { @@ -252,27 +258,36 @@ void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitab void Stream::process_queued_data() { - if (queued_data.empty()) { - return; + std::vector queued_data_copy; + int queued_data_last_starting_point_copy = -1; + + // Hold the lock for as short as possible, since add_data_raw() can possibly + // write to disk, which might disturb the input thread. + { + MutexLock lock(&queued_data_mutex); + if (queued_data.empty()) { + return; + } + + swap(queued_data, queued_data_copy); + swap(queued_data_last_starting_point, queued_data_last_starting_point_copy); } // Update the last suitable starting point for the stream, // if the queued data contains such a starting point. - assert(queued_data_last_starting_point < ssize_t(queued_data.size())); - if (queued_data_last_starting_point >= 0) { + assert(queued_data_last_starting_point_copy < ssize_t(queued_data_copy.size())); + if (queued_data_last_starting_point_copy >= 0) { last_suitable_starting_point = bytes_received; - for (int i = 0; i < queued_data_last_starting_point; ++i) { - last_suitable_starting_point += queued_data[i].iov_len; + for (int i = 0; i < queued_data_last_starting_point_copy; ++i) { + last_suitable_starting_point += queued_data_copy[i].iov_len; } } - add_data_raw(queued_data); - for (size_t i = 0; i < queued_data.size(); ++i) { - char *data = reinterpret_cast(queued_data[i].iov_base); + add_data_raw(queued_data_copy); + for (size_t i = 0; i < queued_data_copy.size(); ++i) { + char *data = reinterpret_cast(queued_data_copy[i].iov_base); delete[] data; } - queued_data.clear(); - queued_data_last_starting_point = -1; // We have more data, so wake up all clients. if (to_process.empty()) { diff --git a/stream.h b/stream.h index f2ba5d4..2cdba11 100644 --- a/stream.h +++ b/stream.h @@ -34,6 +34,11 @@ struct Stream { // Changes the backlog size, restructuring the data as needed. void set_backlog_size(size_t new_size); + // Mutex protecting and . + // Note that if you want to hold both this and the owning server's + // you will need to take before this one. + mutable pthread_mutex_t queued_data_mutex; + std::string url; // The HTTP response header, without the trailing double newline. @@ -79,12 +84,12 @@ struct Stream { // What pool to fetch marks from, or NULL. MarkPool *mark_pool; - // Queued data, if any. Protected by the owning Server's . + // Queued data, if any. Protected by . // The data pointers in the iovec are owned by us. std::vector queued_data; // Index of the last element in queued_data that is suitable to start streaming at. - // -1 if none. + // -1 if none. Protected by . int queued_data_last_starting_point; // Put client to sleep, since there is no more data for it; we will on @@ -94,11 +99,10 @@ struct Stream { // Add more data to , adding Metacube headers if needed. // Does not take ownership of . - // You should hold the owning Server's . void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start); // Add queued data to the stream, if any. - // You should hold the owning Server's _and_ . + // You should hold the owning Server's . void process_queued_data(); private: -- 2.39.2