LDFLAGS=@LDFLAGS@
LIBS=@LIBS@ @protobuf_LIBS@ @libsystemd_LIBS@ @libtomcrypt_LIBS@
-OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o timespec.o state.pb.o tlse/tlse.o
+OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o acceptor.o stats.o accesslog.o thread.o util.o log.o metacube2.o sa_compare.o timespec.o state.pb.o tlse/tlse.o
all: cubemap
#include "accesslog.h"
#include "client.h"
#include "log.h"
-#include "mutexlock.h"
#include "timespec.h"
using namespace std;
AccessLogThread::AccessLogThread()
{
- pthread_mutex_init(&mutex, nullptr);
}
AccessLogThread::AccessLogThread(const string &filename)
: filename(filename) {
- pthread_mutex_init(&mutex, nullptr);
}
void AccessLogThread::write(const ClientStats& client)
{
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
pending_writes.push_back(client);
}
wakeup();
// Empty the queue.
vector<ClientStats> writes;
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
swap(pending_writes, writes);
}
#include <pthread.h>
#include <stdio.h>
+#include <mutex>
#include <string>
#include <vector>
FILE *logfp;
std::string filename;
- pthread_mutex_t mutex;
+ std::mutex mu;
std::vector<ClientStats> pending_writes;
};
#include "httpinput.h"
#include "log.h"
#include "metacube2.h"
-#include "mutexlock.h"
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
url(url),
encoding(encoding)
{
- pthread_mutex_init(&stats_mutex, nullptr);
stats.url = url;
stats.bytes_received = 0;
stats.data_bytes_received = 0;
string protocol, user;
parse_url(url, &protocol, &user, &host, &port, &path); // Don't care if it fails.
- pthread_mutex_init(&stats_mutex, nullptr);
stats.url = url;
stats.bytes_received = serialized.bytes_received();
stats.data_bytes_received = serialized.data_bytes_received();
sock = -1;
}
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.connect_time = -1;
}
request_bytes_sent = 0;
}
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.connect_time = time(nullptr);
clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity);
}
void HTTPInput::process_data(char *ptr, size_t bytes)
{
{
- MutexLock mutex(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.bytes_received += bytes;
}
// TODO: Keep metadata when sending on to other Metacube users.
if (flags & METACUBE_FLAGS_METADATA) {
{
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.metadata_bytes_received += size;
}
process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size);
} else {
// Send this block on to the servers.
{
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.data_bytes_received += size;
}
char *inner_data = pending_data.data() + sizeof(metacube2_block_header);
InputStats HTTPInput::get_stats() const
{
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
return stats;
}
double elapsed = now.tv_sec - be64toh(pkt->tv_sec) +
1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec)));
{
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.latency_sec = elapsed;
}
}
#ifndef _HTTPINPUT_H
#define _HTTPINPUT_H 1
-#include <pthread.h>
#include <stddef.h>
+#include <mutex>
#include <string>
#include <vector>
int sock = -1;
// Mutex protecting <stats>.
- mutable pthread_mutex_t stats_mutex;
+ mutable std::mutex stats_mutex;
// The current statistics for this connection. Protected by <stats_mutex>.
InputStats stats;
+++ /dev/null
-#include "mutexlock.h"
-
-MutexLock::MutexLock(pthread_mutex_t *mutex)
- : mutex(mutex)
-{
- pthread_mutex_lock(this->mutex);
-}
-
-MutexLock::~MutexLock()
-{
- pthread_mutex_unlock(this->mutex);
-}
-
+++ /dev/null
-#ifndef _MUTEXLOCK_H
-#define _MUTEXLOCK_H 1
-
-#include <pthread.h>
-
-// Locks a pthread mutex, RAII-style.
-class MutexLock {
-public:
- MutexLock(pthread_mutex_t *mutex);
- ~MutexLock();
-
-private:
- pthread_mutex_t *mutex;
-};
-
-#endif // !defined(_MUTEXLOCK_H)
#include "accesslog.h"
#include "log.h"
#include "metacube2.h"
-#include "mutexlock.h"
#include "parse.h"
#include "server.h"
#include "state.pb.h"
Server::Server()
{
- pthread_mutex_init(&mutex, nullptr);
- pthread_mutex_init(&queued_clients_mutex, nullptr);
-
epoll_fd = epoll_create(1024); // Size argument is ignored.
if (epoll_fd == -1) {
log_perror("epoll_fd");
{
vector<ClientStats> ret;
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
for (const auto &fd_and_client : clients) {
ret.push_back(fd_and_client.second.get_stats());
}
exit(1);
}
- MutexLock lock(&mutex); // We release the mutex between iterations.
+ lock_guard<mutex> lock(mu); // We release the mutex between iterations.
process_queued_data();
void Server::add_client_deferred(int sock, Acceptor *acceptor)
{
- MutexLock lock(&queued_clients_mutex);
+ lock_guard<mutex> lock(queued_clients_mutex);
queued_add_clients.push_back(std::make_pair(sock, acceptor));
}
void Server::add_client_from_serialized(const ClientProto &client)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
Stream *stream;
int stream_index = lookup_stream_by_url(client.url());
if (stream_index == -1) {
int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(url, streams.size()));
streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
return streams.size() - 1;
int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
stream_url_map.insert(make_pair(stream.url(), streams.size()));
streams.emplace_back(new Stream(stream, data_fd));
return streams.size() - 1;
void Server::set_backlog_size(int stream_index, size_t new_size)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->set_backlog_size(new_size);
}
void Server::set_prebuffering_bytes(int stream_index, size_t new_amount)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->prebuffering_bytes = new_amount;
}
void Server::set_encoding(int stream_index, Stream::Encoding encoding)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->encoding = encoding;
}
void Server::set_src_encoding(int stream_index, Stream::Encoding encoding)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->src_encoding = encoding;
}
void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->http_header = http_header;
void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(clients.empty());
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->pacing_rate = pacing_rate;
void Server::add_gen204(const std::string &url, const std::string &allow_origin)
{
- MutexLock lock(&mutex);
+ lock_guard<mutex> lock(mu);
assert(clients.empty());
ping_url_map[url] = allow_origin;
}
void Server::process_queued_data()
{
{
- MutexLock lock(&queued_clients_mutex);
+ lock_guard<mutex> lock(queued_clients_mutex);
for (const pair<int, Acceptor *> &id_and_acceptor : queued_add_clients) {
add_client(id_and_acceptor.first, id_and_acceptor.second);
#ifndef _SERVER_H
#define _SERVER_H 1
-#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <sys/epoll.h>
#include <time.h>
#include <map>
#include <memory>
+#include <mutex>
#include <queue>
#include <string>
#include <vector>
private:
// Mutex protecting queued_add_clients.
- // Note that if you want to hold both this and <mutex> below,
- // you will need to take <mutex> before this one.
- mutable pthread_mutex_t queued_clients_mutex;
+ // Note that if you want to hold both this and <mu> below,
+ // you will need to take <mu> before this one.
+ mutable std::mutex queued_clients_mutex;
// Deferred commands that should be run from the do_work() thread as soon as possible.
// We defer these for two reasons:
//
// - We only want to fiddle with epoll from one thread at any given time,
// and doing add_client() from the acceptor thread would violate that.
- // - We don't want the input thread(s) hanging on <mutex> when doing
- // add_data(), since they want to do add_data() rather often, and <mutex>
+ // - We don't want the input thread(s) hanging on <mu> when doing
+ // add_data(), since they want to do add_data() rather often, and <mu>
// can be taken a lot of the time.
//
// Protected by <queued_clients_mutex>.
std::vector<std::pair<int, Acceptor *>> queued_add_clients;
// All variables below this line are protected by the mutex.
- mutable pthread_mutex_t mutex;
+ mutable std::mutex mu;
// All streams.
std::vector<std::unique_ptr<Stream>> streams;
#include "log.h"
#include "metacube2.h"
-#include "mutexlock.h"
#include "state.pb.h"
#include "stream.h"
#include "util.h"
if (data_fd == -1) {
exit(1);
}
-
- pthread_mutex_init(&queued_data_mutex, nullptr);
}
Stream::~Stream()
}
suitable_starting_points.push_back(point);
}
-
- pthread_mutex_init(&queued_data_mutex, nullptr);
}
StreamProto Stream::serialize()
return;
}
- MutexLock lock(&queued_data_mutex);
+ lock_guard<mutex> lock(queued_data_mutex);
DataElement data_element;
data_element.metacube_flags = metacube_flags;
// Hold the lock for as short as possible, since add_data_raw() can possibly
// write to disk, which might disturb the input thread.
{
- MutexLock lock(&queued_data_mutex);
+ lock_guard<mutex> lock(queued_data_mutex);
if (queued_data.empty()) {
return;
}
#include <sys/types.h>
#include <sys/uio.h>
#include <deque>
+#include <mutex>
#include <string>
#include <vector>
// Mutex protecting <queued_data> and <queued_data_last_starting_point>.
// Note that if you want to hold both this and the owning server's
// <mutex> you will need to take <mutex> before this one.
- mutable pthread_mutex_t queued_data_mutex;
+ mutable std::mutex queued_data_mutex;
std::string url;
#include <unistd.h>
#include "log.h"
-#include "mutexlock.h"
#include "thread.h"
+using namespace std;
+
Thread::~Thread() {}
void Thread::run()
{
- pthread_mutex_init(&should_stop_mutex, nullptr);
should_stop_status = false;
pthread_create(&worker_thread, nullptr, &Thread::do_work_thunk, this);
}
void Thread::stop()
{
{
- MutexLock lock(&should_stop_mutex);
+ lock_guard<mutex> lock(should_stop_mutex);
should_stop_status = true;
}
wakeup();
bool Thread::should_stop()
{
- MutexLock lock(&should_stop_mutex);
+ lock_guard<mutex> lock(should_stop_mutex);
return should_stop_status;
}
#include <signal.h>
#include <pthread.h>
+#include <mutex>
+
struct timespec;
// A thread class with start/stop and signal functionality.
pthread_t worker_thread;
// Protects should_stop_status.
- pthread_mutex_t should_stop_mutex;
+ std::mutex should_stop_mutex;
// If this is set, the thread should return as soon as possible from do_work().
bool should_stop_status;
#include "acceptor.h"
#include "log.h"
-#include "mutexlock.h"
#include "serverpool.h"
#include "state.pb.h"
#include "stream.h"
construct_header();
- pthread_mutex_init(&stats_mutex, nullptr);
stats.url = url;
stats.connect_time = time(nullptr);
}
construct_header();
- pthread_mutex_init(&stats_mutex, nullptr);
stats.url = url;
stats.bytes_received = serialized.bytes_received();
stats.data_bytes_received = serialized.data_bytes_received();
}
{
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
stats.bytes_received += ret;
stats.data_bytes_received += ret;
}
InputStats UDPInput::get_stats() const
{
- MutexLock lock(&stats_mutex);
+ lock_guard<mutex> lock(stats_mutex);
return stats;
}
#ifndef _UDPINPUT_H
#define _UDPINPUT_H 1
-#include <pthread.h>
+#include <mutex>
#include <string>
#include <vector>
char packet_buf[65536];
// Mutex protecting <stats>.
- mutable pthread_mutex_t stats_mutex;
+ mutable std::mutex stats_mutex;
// The current statistics for this connection. Protected by <stats_mutex>.
InputStats stats;