From: Steinar H. Gunderson Date: Thu, 15 Aug 2013 23:37:43 +0000 (+0200) Subject: Add preliminary support for input stream statistics. X-Git-Tag: 1.0.0~24 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=bd694fdd3dd1417399aecead2c8b91fc4fe95ce8 Add preliminary support for input stream statistics. --- diff --git a/Makefile b/Makefile index 218285e..f08d8c0 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g -pthread LDLIBS=-lprotobuf -pthread -OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o +OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o all: cubemap diff --git a/config.cpp b/config.cpp index f17302a..cb16cbc 100644 --- a/config.cpp +++ b/config.cpp @@ -381,7 +381,14 @@ bool parse_config(const string &filename, Config *config) bool has_stats_file = fetch_config_string(lines, "stats_file", &config->stats_file); bool has_stats_interval = fetch_config_int(lines, "stats_interval", &config->stats_interval); if (has_stats_interval && !has_stats_file) { - log(WARNING, "'stats_interval' given, but no 'stats_file'. No statistics will be written."); + log(WARNING, "'stats_interval' given, but no 'stats_file'. No client statistics will be written."); + } + + config->input_stats_interval = 60; + bool has_input_stats_file = fetch_config_string(lines, "input_stats_file", &config->input_stats_file); + bool has_input_stats_interval = fetch_config_int(lines, "input_stats_interval", &config->input_stats_interval); + if (has_input_stats_interval && !has_input_stats_file) { + log(WARNING, "'input_stats_interval' given, but no 'input_stats_file'. No input statistics will be written."); } fetch_config_string(lines, "access_log", &config->access_log_file); @@ -391,6 +398,8 @@ bool parse_config(const string &filename, Config *config) if (line.keyword == "num_servers" || line.keyword == "stats_file" || line.keyword == "stats_interval" || + line.keyword == "input_stats_file" || + line.keyword == "input_stats_interval" || line.keyword == "access_log") { // Already taken care of, above. } else if (line.keyword == "port") { diff --git a/config.h b/config.h index cf3fdfb..e7959c5 100644 --- a/config.h +++ b/config.h @@ -48,6 +48,9 @@ struct Config { std::string stats_file; // Empty means no stats file. int stats_interval; + std::string input_stats_file; // Empty means no input stats file. + int input_stats_interval; + std::string access_log_file; // Empty means no accses_log file. }; diff --git a/cubemap.config.sample b/cubemap.config.sample index 83fc568..c6ab182 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -16,6 +16,9 @@ port 9094 stats_file cubemap.stats stats_interval 60 +input_stats_file cubemap-input.stats +input_stats_interval 60 + # Logging of clients as they disconnect (and as such as no longer visible in the stats file). # You can only have zero or one of these. access_log access.log diff --git a/httpinput.cpp b/httpinput.cpp index 3bee5d6..688f321 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -18,6 +18,7 @@ #include "httpinput.h" #include "log.h" #include "metacube.h" +#include "mutexlock.h" #include "parse.h" #include "serverpool.h" #include "state.pb.h" @@ -34,6 +35,10 @@ HTTPInput::HTTPInput(const string &url) has_metacube_header(false), sock(-1) { + pthread_mutex_init(&stats_mutex, NULL); + stats.url = url; + stats.bytes_received = 0; + stats.data_bytes_received = 0; } HTTPInput::HTTPInput(const InputProto &serialized) @@ -46,6 +51,8 @@ HTTPInput::HTTPInput(const InputProto &serialized) has_metacube_header(serialized.has_metacube_header()), sock(serialized.sock()) { + pthread_mutex_init(&stats_mutex, NULL); + pending_data.resize(serialized.pending_data().size()); memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); @@ -58,6 +65,11 @@ HTTPInput::HTTPInput(const InputProto &serialized) memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) { http_header.resize(http_header.size() - 2); } + + pthread_mutex_init(&stats_mutex, NULL); + stats.url = url; + stats.bytes_received = serialized.bytes_received(); + stats.data_bytes_received = serialized.data_bytes_received(); } void HTTPInput::close_socket() @@ -79,6 +91,8 @@ InputProto HTTPInput::serialize() const serialized.set_pending_data(string(pending_data.begin(), pending_data.end())); serialized.set_has_metacube_header(has_metacube_header); serialized.set_sock(sock); + serialized.set_bytes_received(stats.bytes_received); + serialized.set_data_bytes_received(stats.data_bytes_received); return serialized; } @@ -420,6 +434,10 @@ void HTTPInput::do_work() void HTTPInput::process_data(char *ptr, size_t bytes) { pending_data.insert(pending_data.end(), ptr, ptr + bytes); + { + MutexLock mutex(&stats_mutex); + stats.bytes_received += bytes; + } for ( ;; ) { // If we don't have enough data (yet) for even the Metacube header, just return. @@ -468,7 +486,11 @@ void HTTPInput::process_data(char *ptr, size_t bytes) return; } - // Send this block on to the data. + // Send this block on to the servers. + { + MutexLock lock(&stats_mutex); + stats.data_bytes_received += size; + } char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); @@ -506,3 +528,8 @@ void HTTPInput::drop_pending_data(size_t num_bytes) pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes); } +InputStats HTTPInput::get_stats() const +{ + MutexLock lock(&stats_mutex); + return stats; +} diff --git a/httpinput.h b/httpinput.h index fc39b30..bf7d953 100644 --- a/httpinput.h +++ b/httpinput.h @@ -26,6 +26,8 @@ public: stream_indices.push_back(stream_index); } + virtual InputStats get_stats() const; + private: // Actually does the download. virtual void do_work(); @@ -81,7 +83,13 @@ private: bool has_metacube_header; // The socket we are downloading on (or -1). - int sock; + int sock; + + // Mutex protecting . + mutable pthread_mutex_t stats_mutex; + + // The current statistics for this connection. Protected by . + InputStats stats; }; #endif // !defined(_HTTPINPUT_H) diff --git a/input.h b/input.h index 76e1b66..f139755 100644 --- a/input.h +++ b/input.h @@ -16,6 +16,26 @@ bool parse_url(const std::string &url, std::string *protocol, std::string *host, Input *create_input(const std::string &url); Input *create_input(const InputProto &serialized); +// Digested statistics for writing to logs etc. +struct InputStats { + std::string url; + + // The number of bytes we have received so far, including any Metacube headers. + // + // Not reset across connections. + size_t bytes_received; + + // The number of data bytes we have received so far (or more precisely, + // number of data bytes we have sent on to the stream). This excludes Metacube + // headers and corrupted data we've skipped. + // + // Not reset across connections. + size_t data_bytes_received; + + // TODO: Number of loss events and connection time might both be useful, + // similar to for clients. Also, per-connection byte counters. +}; + class Input : public Thread { public: virtual ~Input(); @@ -23,6 +43,9 @@ public: virtual std::string get_url() const = 0; virtual void close_socket() = 0; virtual void add_destination(int stream_index) = 0; + + // Note: May be called from a different thread, so must be thread-safe. + virtual InputStats get_stats() const = 0; }; #endif // !defined(_INPUT_H) diff --git a/input_stats.cpp b/input_stats.cpp new file mode 100644 index 0000000..ce38414 --- /dev/null +++ b/input_stats.cpp @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "client.h" +#include "log.h" +#include "input.h" +#include "input_stats.h" +#include "util.h" + +using namespace std; + +InputStatsThread::InputStatsThread(const string &stats_file, int stats_interval, const vector &inputs) + : stats_file(stats_file), + stats_interval(stats_interval), + inputs(inputs) +{ +} + +void InputStatsThread::do_work() +{ + while (!should_stop()) { + int fd; + FILE *fp; + vector client_stats; + + // Open a new, temporary file. + char *filename = strdup((stats_file + ".new.XXXXXX").c_str()); + fd = mkostemp(filename, O_WRONLY); + if (fd == -1) { + log_perror(filename); + free(filename); + goto sleep; + } + + fp = fdopen(fd, "w"); + if (fp == NULL) { + log_perror("fdopen"); + safe_close(fd); + if (unlink(filename) == -1) { + log_perror(filename); + } + free(filename); + goto sleep; + } + + for (size_t i = 0; i < inputs.size(); ++i) { + InputStats stats = inputs[i]->get_stats(); + fprintf(fp, "%s %llu %llu\n", stats.url.c_str(), + (long long unsigned)(stats.bytes_received), + (long long unsigned)(stats.data_bytes_received)); + } + if (fclose(fp) == EOF) { + log_perror("fclose"); + if (unlink(filename) == -1) { + log_perror(filename); + } + free(filename); + goto sleep; + } + + if (rename(filename, stats_file.c_str()) == -1) { + log_perror("rename"); + if (unlink(filename) == -1) { + log_perror(filename); + } + } + free(filename); + +sleep: + // Wait until we are asked to quit, stats_interval timeout, + // or a spurious signal. (The latter will cause us to write stats + // too often, but that's okay.) + timespec timeout_ts; + timeout_ts.tv_sec = stats_interval; + timeout_ts.tv_nsec = 0; + wait_for_wakeup(&timeout_ts); + } +} diff --git a/input_stats.h b/input_stats.h new file mode 100644 index 0000000..1149b7c --- /dev/null +++ b/input_stats.h @@ -0,0 +1,25 @@ +#ifndef _INPUT_STATS_H +#define _INPUT_STATS_H 1 + +#include "thread.h" +#include +#include + +// A thread that regularly writes out input statistics, ie. a list of all inputs +// with some information about each. Very similar to StatsThread, but for inputs instead +// of clients. + +class InputStatsThread : public Thread { +public: + // Does not take ownership of the inputs. + InputStatsThread(const std::string &stats_file, int stats_interval, const std::vector &inputs); + +private: + virtual void do_work(); + + std::string stats_file; + int stats_interval; + std::vector inputs; +}; + +#endif // !defined(_INPUT_STATS_H) diff --git a/main.cpp b/main.cpp index 8d60826..c40b78f 100644 --- a/main.cpp +++ b/main.cpp @@ -20,6 +20,7 @@ #include "accesslog.h" #include "config.h" #include "input.h" +#include "input_stats.h" #include "log.h" #include "markpool.h" #include "serverpool.h" @@ -448,6 +449,18 @@ start: stats_thread->run(); } + InputStatsThread *input_stats_thread = NULL; + if (!config.input_stats_file.empty()) { + vector inputs_no_refcount; + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); ++input_it) { + inputs_no_refcount.push_back(input_it->second.input); + } + + input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount); + input_stats_thread->run(); + } + struct timeval server_start; gettimeofday(&server_start, NULL); if (state_fd != -1) { @@ -466,6 +479,10 @@ start: // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. gettimeofday(&serialize_start, NULL); + if (input_stats_thread != NULL) { + input_stats_thread->stop(); + delete input_stats_thread; + } if (stats_thread != NULL) { stats_thread->stop(); delete stats_thread; diff --git a/state.proto b/state.proto index 93c56e3..d40acc9 100644 --- a/state.proto +++ b/state.proto @@ -43,6 +43,8 @@ message InputProto { optional bytes pending_data = 7; optional bool has_metacube_header = 8; optional int32 sock = 9; + optional int64 bytes_received = 11; + optional int64 data_bytes_received = 12; }; // Corresponds to class Acceptor. diff --git a/udpinput.cpp b/udpinput.cpp index 048866a..e444bbc 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -9,6 +9,7 @@ #include "acceptor.h" #include "log.h" +#include "mutexlock.h" #include "serverpool.h" #include "state.pb.h" #include "udpinput.h" @@ -29,6 +30,11 @@ UDPInput::UDPInput(const string &url) assert(ok); construct_header(); + + pthread_mutex_init(&stats_mutex, NULL); + stats.url = url; + stats.bytes_received = 0; + stats.data_bytes_received = 0; } UDPInput::UDPInput(const InputProto &serialized) @@ -41,6 +47,11 @@ UDPInput::UDPInput(const InputProto &serialized) assert(ok); construct_header(); + + pthread_mutex_init(&stats_mutex, NULL); + stats.url = url; + stats.bytes_received = serialized.bytes_received(); + stats.data_bytes_received = serialized.data_bytes_received(); } InputProto UDPInput::serialize() const @@ -48,6 +59,8 @@ InputProto UDPInput::serialize() const InputProto serialized; serialized.set_url(url); serialized.set_sock(sock); + serialized.set_bytes_received(stats.bytes_received); + serialized.set_data_bytes_received(stats.data_bytes_received); return serialized; } @@ -104,9 +117,21 @@ void UDPInput::do_work() close_socket(); continue; } + + { + MutexLock lock(&stats_mutex); + stats.bytes_received += ret; + stats.data_bytes_received += ret; + } for (size_t i = 0; i < stream_indices.size(); ++i) { servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START); } } } + +InputStats UDPInput::get_stats() const +{ + MutexLock lock(&stats_mutex); + return stats; +} diff --git a/udpinput.h b/udpinput.h index 0ee5dc9..caff008 100644 --- a/udpinput.h +++ b/udpinput.h @@ -21,6 +21,8 @@ public: virtual void add_destination(int stream_index); + virtual InputStats get_stats() const; + private: // Actually gets the packets. virtual void do_work(); @@ -42,6 +44,12 @@ private: // Temporary buffer, sized for the maximum size of an UDP packet. char packet_buf[65536]; + + // Mutex protecting . + mutable pthread_mutex_t stats_mutex; + + // The current statistics for this connection. Protected by . + InputStats stats; }; #endif // !defined(_UDPINPUT_H)