From 50651c954803c1941e6ad1bb494712891c18f7d2 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 5 Apr 2018 17:58:22 +0200 Subject: [PATCH] Use C++11 std::mutex and std::lock_guard instead of our RAII wrapper. --- Makefile.in | 2 +- accesslog.cpp | 7 ++----- accesslog.h | 3 ++- httpinput.cpp | 17 +++++++---------- httpinput.h | 4 ++-- mutexlock.cpp | 13 ------------- mutexlock.h | 16 ---------------- server.cpp | 32 ++++++++++++++------------------ server.h | 14 +++++++------- stream.cpp | 9 ++------- stream.h | 3 ++- thread.cpp | 8 ++++---- thread.h | 4 +++- udpinput.cpp | 7 ++----- udpinput.h | 4 ++-- 15 files changed, 50 insertions(+), 93 deletions(-) delete mode 100644 mutexlock.cpp delete mode 100644 mutexlock.h diff --git a/Makefile.in b/Makefile.in index c702ef6..0e5fb2f 100644 --- a/Makefile.in +++ b/Makefile.in @@ -8,7 +8,7 @@ CXXFLAGS=-Wall @CXXFLAGS@ @protobuf_CFLAGS@ @libsystemd_CFLAGS@ @libtomcrypt_CFL LDFLAGS=@LDFLAGS@ LIBS=@LIBS@ @protobuf_LIBS@ @libsystemd_LIBS@ @libtomcrypt_LIBS@ -OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o timespec.o state.pb.o tlse/tlse.o +OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o timespec.o state.pb.o tlse/tlse.o all: cubemap diff --git a/accesslog.cpp b/accesslog.cpp index 4a54fdb..2d5eb05 100644 --- a/accesslog.cpp +++ b/accesslog.cpp @@ -7,25 +7,22 @@ #include "accesslog.h" #include "client.h" #include "log.h" -#include "mutexlock.h" #include "timespec.h" using namespace std; AccessLogThread::AccessLogThread() { - pthread_mutex_init(&mutex, nullptr); } AccessLogThread::AccessLogThread(const string &filename) : filename(filename) { - pthread_mutex_init(&mutex, nullptr); } void AccessLogThread::write(const ClientStats& client) { { - MutexLock lock(&mutex); + lock_guard lock(mu); pending_writes.push_back(client); } wakeup(); @@ -48,7 +45,7 @@ void AccessLogThread::do_work() // Empty the queue. vector writes; { - MutexLock lock(&mutex); + lock_guard lock(mu); swap(pending_writes, writes); } diff --git a/accesslog.h b/accesslog.h index 9eae365..b6ce61c 100644 --- a/accesslog.h +++ b/accesslog.h @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -33,7 +34,7 @@ private: FILE *logfp; std::string filename; - pthread_mutex_t mutex; + std::mutex mu; std::vector pending_writes; }; diff --git a/httpinput.cpp b/httpinput.cpp index 155e0b3..05ff37c 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -20,7 +20,6 @@ #include "httpinput.h" #include "log.h" #include "metacube2.h" -#include "mutexlock.h" #include "parse.h" #include "serverpool.h" #include "state.pb.h" @@ -51,7 +50,6 @@ HTTPInput::HTTPInput(const string &url, Input::Encoding encoding) url(url), encoding(encoding) { - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = 0; stats.data_bytes_received = 0; @@ -80,7 +78,6 @@ HTTPInput::HTTPInput(const InputProto &serialized) string protocol, user; parse_url(url, &protocol, &user, &host, &port, &path); // Don't care if it fails. - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = serialized.bytes_received(); stats.data_bytes_received = serialized.data_bytes_received(); @@ -107,7 +104,7 @@ void HTTPInput::close_socket() sock = -1; } - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.connect_time = -1; } @@ -396,7 +393,7 @@ void HTTPInput::do_work() request_bytes_sent = 0; } - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.connect_time = time(nullptr); clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity); } @@ -566,7 +563,7 @@ void HTTPInput::do_work() void HTTPInput::process_data(char *ptr, size_t bytes) { { - MutexLock mutex(&stats_mutex); + lock_guard lock(stats_mutex); stats.bytes_received += bytes; } @@ -645,14 +642,14 @@ void HTTPInput::process_data(char *ptr, size_t bytes) // TODO: Keep metadata when sending on to other Metacube users. if (flags & METACUBE_FLAGS_METADATA) { { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.metadata_bytes_received += size; } process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size); } else { // Send this block on to the servers. { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.data_bytes_received += size; } char *inner_data = pending_data.data() + sizeof(metacube2_block_header); @@ -694,7 +691,7 @@ void HTTPInput::add_destination(int stream_index) InputStats HTTPInput::get_stats() const { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); return stats; } @@ -725,7 +722,7 @@ void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hd double elapsed = now.tv_sec - be64toh(pkt->tv_sec) + 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec))); { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.latency_sec = elapsed; } } diff --git a/httpinput.h b/httpinput.h index ac52bfc..31819f9 100644 --- a/httpinput.h +++ b/httpinput.h @@ -1,8 +1,8 @@ #ifndef _HTTPINPUT_H #define _HTTPINPUT_H 1 -#include #include +#include #include #include @@ -93,7 +93,7 @@ private: int sock = -1; // Mutex protecting . - mutable pthread_mutex_t stats_mutex; + mutable std::mutex stats_mutex; // The current statistics for this connection. Protected by . InputStats stats; diff --git a/mutexlock.cpp b/mutexlock.cpp deleted file mode 100644 index a1dcc6a..0000000 --- a/mutexlock.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include "mutexlock.h" - -MutexLock::MutexLock(pthread_mutex_t *mutex) - : mutex(mutex) -{ - pthread_mutex_lock(this->mutex); -} - -MutexLock::~MutexLock() -{ - pthread_mutex_unlock(this->mutex); -} - diff --git a/mutexlock.h b/mutexlock.h deleted file mode 100644 index 0d1bc00..0000000 --- a/mutexlock.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef _MUTEXLOCK_H -#define _MUTEXLOCK_H 1 - -#include - -// Locks a pthread mutex, RAII-style. -class MutexLock { -public: - MutexLock(pthread_mutex_t *mutex); - ~MutexLock(); - -private: - pthread_mutex_t *mutex; -}; - -#endif // !defined(_MUTEXLOCK_H) diff --git a/server.cpp b/server.cpp index 2ec2fe0..852ecdd 100644 --- a/server.cpp +++ b/server.cpp @@ -23,7 +23,6 @@ #include "accesslog.h" #include "log.h" #include "metacube2.h" -#include "mutexlock.h" #include "parse.h" #include "server.h" #include "state.pb.h" @@ -57,9 +56,6 @@ inline bool is_earlier(timespec a, timespec b) Server::Server() { - pthread_mutex_init(&mutex, nullptr); - pthread_mutex_init(&queued_clients_mutex, nullptr); - epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { log_perror("epoll_fd"); @@ -76,7 +72,7 @@ vector Server::get_client_stats() const { vector ret; - MutexLock lock(&mutex); + lock_guard lock(mu); for (const auto &fd_and_client : clients) { ret.push_back(fd_and_client.second.get_stats()); } @@ -100,7 +96,7 @@ void Server::do_work() exit(1); } - MutexLock lock(&mutex); // We release the mutex between iterations. + lock_guard lock(mu); // We release the mutex between iterations. process_queued_data(); @@ -199,7 +195,7 @@ CubemapStateProto Server::serialize() void Server::add_client_deferred(int sock, Acceptor *acceptor) { - MutexLock lock(&queued_clients_mutex); + lock_guard lock(queued_clients_mutex); queued_add_clients.push_back(std::make_pair(sock, acceptor)); } @@ -254,7 +250,7 @@ void Server::add_client(int sock, Acceptor *acceptor) void Server::add_client_from_serialized(const ClientProto &client) { - MutexLock lock(&mutex); + lock_guard lock(mu); Stream *stream; int stream_index = lookup_stream_by_url(client.url()); if (stream_index == -1) { @@ -313,7 +309,7 @@ int Server::lookup_stream_by_url(const string &url) const int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding) { - MutexLock lock(&mutex); + lock_guard lock(mu); stream_url_map.insert(make_pair(url, streams.size())); streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding)); return streams.size() - 1; @@ -321,7 +317,7 @@ int Server::add_stream(const string &url, size_t backlog_size, size_t prebufferi int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { - MutexLock lock(&mutex); + lock_guard lock(mu); stream_url_map.insert(make_pair(stream.url(), streams.size())); streams.emplace_back(new Stream(stream, data_fd)); return streams.size() - 1; @@ -329,35 +325,35 @@ int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) void Server::set_backlog_size(int stream_index, size_t new_size) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->set_backlog_size(new_size); } void Server::set_prebuffering_bytes(int stream_index, size_t new_amount) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->prebuffering_bytes = new_amount; } void Server::set_encoding(int stream_index, Stream::Encoding encoding) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->encoding = encoding; } void Server::set_src_encoding(int stream_index, Stream::Encoding encoding) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->src_encoding = encoding; } void Server::set_header(int stream_index, const string &http_header, const string &stream_header) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->http_header = http_header; @@ -376,7 +372,7 @@ void Server::set_header(int stream_index, const string &http_header, const strin void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(clients.empty()); assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->pacing_rate = pacing_rate; @@ -384,7 +380,7 @@ void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) void Server::add_gen204(const std::string &url, const std::string &allow_origin) { - MutexLock lock(&mutex); + lock_guard lock(mu); assert(clients.empty()); ping_url_map[url] = allow_origin; } @@ -978,7 +974,7 @@ void Server::change_epoll_events(Client *client, uint32_t events) void Server::process_queued_data() { { - MutexLock lock(&queued_clients_mutex); + lock_guard lock(queued_clients_mutex); for (const pair &id_and_acceptor : queued_add_clients) { add_client(id_and_acceptor.first, id_and_acceptor.second); diff --git a/server.h b/server.h index dfcac0b..68e72c4 100644 --- a/server.h +++ b/server.h @@ -1,7 +1,6 @@ #ifndef _SERVER_H #define _SERVER_H 1 -#include #include #include #include @@ -9,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -71,24 +71,24 @@ public: private: // Mutex protecting queued_add_clients. - // Note that if you want to hold both this and below, - // you will need to take before this one. - mutable pthread_mutex_t queued_clients_mutex; + // Note that if you want to hold both this and below, + // you will need to take before this one. + mutable std::mutex queued_clients_mutex; // Deferred commands that should be run from the do_work() thread as soon as possible. // We defer these for two reasons: // // - We only want to fiddle with epoll from one thread at any given time, // and doing add_client() from the acceptor thread would violate that. - // - We don't want the input thread(s) hanging on when doing - // add_data(), since they want to do add_data() rather often, and + // - We don't want the input thread(s) hanging on when doing + // add_data(), since they want to do add_data() rather often, and // can be taken a lot of the time. // // Protected by . std::vector> queued_add_clients; // All variables below this line are protected by the mutex. - mutable pthread_mutex_t mutex; + mutable std::mutex mu; // All streams. std::vector> streams; diff --git a/stream.cpp b/stream.cpp index d7a78a6..3a4ccc3 100644 --- a/stream.cpp +++ b/stream.cpp @@ -13,7 +13,6 @@ #include "log.h" #include "metacube2.h" -#include "mutexlock.h" #include "state.pb.h" #include "stream.h" #include "util.h" @@ -31,8 +30,6 @@ Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes if (data_fd == -1) { exit(1); } - - pthread_mutex_init(&queued_data_mutex, nullptr); } Stream::~Stream() @@ -65,8 +62,6 @@ Stream::Stream(const StreamProto &serialized, int data_fd) } suitable_starting_points.push_back(point); } - - pthread_mutex_init(&queued_data_mutex, nullptr); } StreamProto Stream::serialize() @@ -232,7 +227,7 @@ void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube return; } - MutexLock lock(&queued_data_mutex); + lock_guard lock(queued_data_mutex); DataElement data_element; data_element.metacube_flags = metacube_flags; @@ -271,7 +266,7 @@ void Stream::process_queued_data() // Hold the lock for as short as possible, since add_data_raw() can possibly // write to disk, which might disturb the input thread. { - MutexLock lock(&queued_data_mutex); + lock_guard lock(queued_data_mutex); if (queued_data.empty()) { return; } diff --git a/stream.h b/stream.h index 6c757a1..17df7f3 100644 --- a/stream.h +++ b/stream.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -32,7 +33,7 @@ struct Stream { // Mutex protecting and . // Note that if you want to hold both this and the owning server's // you will need to take before this one. - mutable pthread_mutex_t queued_data_mutex; + mutable std::mutex queued_data_mutex; std::string url; diff --git a/thread.cpp b/thread.cpp index 473c71a..d85be70 100644 --- a/thread.cpp +++ b/thread.cpp @@ -7,14 +7,14 @@ #include #include "log.h" -#include "mutexlock.h" #include "thread.h" +using namespace std; + Thread::~Thread() {} void Thread::run() { - pthread_mutex_init(&should_stop_mutex, nullptr); should_stop_status = false; pthread_create(&worker_thread, nullptr, &Thread::do_work_thunk, this); } @@ -22,7 +22,7 @@ void Thread::run() void Thread::stop() { { - MutexLock lock(&should_stop_mutex); + lock_guard lock(should_stop_mutex); should_stop_status = true; } wakeup(); @@ -92,6 +92,6 @@ void Thread::wakeup() bool Thread::should_stop() { - MutexLock lock(&should_stop_mutex); + lock_guard lock(should_stop_mutex); return should_stop_status; } diff --git a/thread.h b/thread.h index 09e2754..46b6d39 100644 --- a/thread.h +++ b/thread.h @@ -4,6 +4,8 @@ #include #include +#include + struct timespec; // A thread class with start/stop and signal functionality. @@ -51,7 +53,7 @@ private: pthread_t worker_thread; // Protects should_stop_status. - pthread_mutex_t should_stop_mutex; + std::mutex should_stop_mutex; // If this is set, the thread should return as soon as possible from do_work(). bool should_stop_status; diff --git a/udpinput.cpp b/udpinput.cpp index 93eaaa7..0035640 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -11,7 +11,6 @@ #include "acceptor.h" #include "log.h" -#include "mutexlock.h" #include "serverpool.h" #include "state.pb.h" #include "stream.h" @@ -113,7 +112,6 @@ UDPInput::UDPInput(const string &url) construct_header(); - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.connect_time = time(nullptr); } @@ -129,7 +127,6 @@ UDPInput::UDPInput(const InputProto &serialized) construct_header(); - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = serialized.bytes_received(); stats.data_bytes_received = serialized.data_bytes_received(); @@ -220,7 +217,7 @@ void UDPInput::do_work() } { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.bytes_received += ret; stats.data_bytes_received += ret; } @@ -233,6 +230,6 @@ void UDPInput::do_work() InputStats UDPInput::get_stats() const { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); return stats; } diff --git a/udpinput.h b/udpinput.h index b3f504a..4de9788 100644 --- a/udpinput.h +++ b/udpinput.h @@ -1,7 +1,7 @@ #ifndef _UDPINPUT_H #define _UDPINPUT_H 1 -#include +#include #include #include @@ -47,7 +47,7 @@ private: char packet_buf[65536]; // Mutex protecting . - mutable pthread_mutex_t stats_mutex; + mutable std::mutex stats_mutex; // The current statistics for this connection. Protected by . InputStats stats; -- 2.39.2