Add preliminary support for input stream statistics.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 15 Aug 2013 23:37:43 +0000 (01:37 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 15 Aug 2013 23:37:43 +0000 (01:37 +0200)
13 files changed:
Makefile
config.cpp
config.h
cubemap.config.sample
httpinput.cpp
httpinput.h
input.h
input_stats.cpp [new file with mode: 0644]
input_stats.h [new file with mode: 0644]
main.cpp
state.proto
udpinput.cpp
udpinput.h

index 218285e..f08d8c0 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ PROTOC=protoc
 CXXFLAGS=-Wall -O2 -g -pthread
 LDLIBS=-lprotobuf -pthread
 
-OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
+OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
 
 all: cubemap
 
index f17302a..cb16cbc 100644 (file)
@@ -381,7 +381,14 @@ bool parse_config(const string &filename, Config *config)
        bool has_stats_file = fetch_config_string(lines, "stats_file", &config->stats_file);
        bool has_stats_interval = fetch_config_int(lines, "stats_interval", &config->stats_interval);
        if (has_stats_interval && !has_stats_file) {
-               log(WARNING, "'stats_interval' given, but no 'stats_file'. No statistics will be written.");
+               log(WARNING, "'stats_interval' given, but no 'stats_file'. No client statistics will be written.");
+       }
+
+       config->input_stats_interval = 60;
+       bool has_input_stats_file = fetch_config_string(lines, "input_stats_file", &config->input_stats_file);
+       bool has_input_stats_interval = fetch_config_int(lines, "input_stats_interval", &config->input_stats_interval);
+       if (has_input_stats_interval && !has_input_stats_file) {
+               log(WARNING, "'input_stats_interval' given, but no 'input_stats_file'. No input statistics will be written.");
        }
        
        fetch_config_string(lines, "access_log", &config->access_log_file);
@@ -391,6 +398,8 @@ bool parse_config(const string &filename, Config *config)
                if (line.keyword == "num_servers" ||
                    line.keyword == "stats_file" ||
                    line.keyword == "stats_interval" ||
+                   line.keyword == "input_stats_file" ||
+                   line.keyword == "input_stats_interval" ||
                    line.keyword == "access_log") {
                        // Already taken care of, above.
                } else if (line.keyword == "port") {
index cf3fdfb..e7959c5 100644 (file)
--- a/config.h
+++ b/config.h
@@ -48,6 +48,9 @@ struct Config {
        std::string stats_file;  // Empty means no stats file.
        int stats_interval;
 
+       std::string input_stats_file;  // Empty means no input stats file.
+       int input_stats_interval;
+
        std::string access_log_file;  // Empty means no accses_log file.
 };
 
index 83fc568..c6ab182 100644 (file)
@@ -16,6 +16,9 @@ port 9094
 stats_file cubemap.stats
 stats_interval 60
 
+input_stats_file cubemap-input.stats
+input_stats_interval 60
+
 # Logging of clients as they disconnect (and as such as no longer visible in the stats file).
 # You can only have zero or one of these.
 access_log access.log
index 3bee5d6..688f321 100644 (file)
@@ -18,6 +18,7 @@
 #include "httpinput.h"
 #include "log.h"
 #include "metacube.h"
+#include "mutexlock.h"
 #include "parse.h"
 #include "serverpool.h"
 #include "state.pb.h"
@@ -34,6 +35,10 @@ HTTPInput::HTTPInput(const string &url)
          has_metacube_header(false),
          sock(-1)
 {
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = 0;
+       stats.data_bytes_received = 0;
 }
 
 HTTPInput::HTTPInput(const InputProto &serialized)
@@ -46,6 +51,8 @@ HTTPInput::HTTPInput(const InputProto &serialized)
          has_metacube_header(serialized.has_metacube_header()),
          sock(serialized.sock())
 {
+       pthread_mutex_init(&stats_mutex, NULL);
+
        pending_data.resize(serialized.pending_data().size());
        memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size());
 
@@ -58,6 +65,11 @@ HTTPInput::HTTPInput(const InputProto &serialized)
            memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) {
                http_header.resize(http_header.size() - 2);
        }
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = serialized.bytes_received();
+       stats.data_bytes_received = serialized.data_bytes_received();
 }
 
 void HTTPInput::close_socket()
@@ -79,6 +91,8 @@ InputProto HTTPInput::serialize() const
        serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
        serialized.set_has_metacube_header(has_metacube_header);
        serialized.set_sock(sock);
+       serialized.set_bytes_received(stats.bytes_received);
+       serialized.set_data_bytes_received(stats.data_bytes_received);
        return serialized;
 }
 
@@ -420,6 +434,10 @@ void HTTPInput::do_work()
 void HTTPInput::process_data(char *ptr, size_t bytes)
 {
        pending_data.insert(pending_data.end(), ptr, ptr + bytes);
+       {
+               MutexLock mutex(&stats_mutex);
+               stats.bytes_received += bytes;
+       }
 
        for ( ;; ) {
                // If we don't have enough data (yet) for even the Metacube header, just return.
@@ -468,7 +486,11 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                        return;
                }
 
-               // Send this block on to the data.
+               // Send this block on to the servers.
+               {
+                       MutexLock lock(&stats_mutex);
+                       stats.data_bytes_received += size;
+               }
                char *inner_data = pending_data.data() + sizeof(metacube_block_header);
                if (flags & METACUBE_FLAGS_HEADER) {
                        string header(inner_data, inner_data + size);
@@ -506,3 +528,8 @@ void HTTPInput::drop_pending_data(size_t num_bytes)
        pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
 }
 
+InputStats HTTPInput::get_stats() const
+{
+       MutexLock lock(&stats_mutex);
+       return stats;
+}
index fc39b30..bf7d953 100644 (file)
@@ -26,6 +26,8 @@ public:
                stream_indices.push_back(stream_index);
        }
 
+       virtual InputStats get_stats() const;
+
 private:
        // Actually does the download.
        virtual void do_work();
@@ -81,7 +83,13 @@ private:
        bool has_metacube_header;
 
        // The socket we are downloading on (or -1).
-       int sock;       
+       int sock;
+
+       // Mutex protecting <stats>.
+       mutable pthread_mutex_t stats_mutex;
+
+       // The current statistics for this connection. Protected by <stats_mutex>.
+       InputStats stats;
 };
 
 #endif  // !defined(_HTTPINPUT_H)
diff --git a/input.h b/input.h
index 76e1b66..f139755 100644 (file)
--- a/input.h
+++ b/input.h
@@ -16,6 +16,26 @@ bool parse_url(const std::string &url, std::string *protocol, std::string *host,
 Input *create_input(const std::string &url);
 Input *create_input(const InputProto &serialized);
 
+// Digested statistics for writing to logs etc.
+struct InputStats {
+       std::string url;
+
+       // The number of bytes we have received so far, including any Metacube headers.
+       //
+       // Not reset across connections.
+       size_t bytes_received;
+
+       // The number of data bytes we have received so far (or more precisely,
+       // number of data bytes we have sent on to the stream). This excludes Metacube
+       // headers and corrupted data we've skipped.
+       //
+       // Not reset across connections.
+       size_t data_bytes_received;
+
+       // TODO: Number of loss events and connection time might both be useful,
+       // similar to for clients. Also, per-connection byte counters.
+};
+
 class Input : public Thread {
 public:
        virtual ~Input();
@@ -23,6 +43,9 @@ public:
        virtual std::string get_url() const = 0;
        virtual void close_socket() = 0;
        virtual void add_destination(int stream_index) = 0;
+
+       // Note: May be called from a different thread, so must be thread-safe.
+       virtual InputStats get_stats() const = 0;
 };
 
 #endif  // !defined(_INPUT_H)
diff --git a/input_stats.cpp b/input_stats.cpp
new file mode 100644 (file)
index 0000000..ce38414
--- /dev/null
@@ -0,0 +1,84 @@
+#include <fcntl.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <vector>
+
+#include "client.h"
+#include "log.h"
+#include "input.h"
+#include "input_stats.h"
+#include "util.h"
+
+using namespace std;
+
+InputStatsThread::InputStatsThread(const string &stats_file, int stats_interval, const vector<Input*> &inputs)
+       : stats_file(stats_file),
+         stats_interval(stats_interval),
+         inputs(inputs)
+{
+}
+
+void InputStatsThread::do_work()
+{
+       while (!should_stop()) {
+               int fd;
+               FILE *fp;
+               vector<ClientStats> client_stats;
+
+               // Open a new, temporary file.
+               char *filename = strdup((stats_file + ".new.XXXXXX").c_str());
+               fd = mkostemp(filename, O_WRONLY);
+               if (fd == -1) {
+                       log_perror(filename);
+                       free(filename);
+                       goto sleep;
+               }
+
+               fp = fdopen(fd, "w");
+               if (fp == NULL) {
+                       log_perror("fdopen");
+                       safe_close(fd);
+                       if (unlink(filename) == -1) {
+                               log_perror(filename);
+                       }
+                       free(filename);
+                       goto sleep;
+               }
+
+               for (size_t i = 0; i < inputs.size(); ++i) {
+                       InputStats stats = inputs[i]->get_stats();
+                       fprintf(fp, "%s %llu %llu\n", stats.url.c_str(),
+                               (long long unsigned)(stats.bytes_received),
+                               (long long unsigned)(stats.data_bytes_received));
+               }
+               if (fclose(fp) == EOF) {
+                       log_perror("fclose");
+                       if (unlink(filename) == -1) {
+                               log_perror(filename);
+                       }
+                       free(filename);
+                       goto sleep;
+               }
+               
+               if (rename(filename, stats_file.c_str()) == -1) {
+                       log_perror("rename");
+                       if (unlink(filename) == -1) {
+                               log_perror(filename);
+                       }
+               }
+               free(filename);
+
+sleep:
+               // Wait until we are asked to quit, stats_interval timeout,
+               // or a spurious signal. (The latter will cause us to write stats
+               // too often, but that's okay.)
+               timespec timeout_ts;
+               timeout_ts.tv_sec = stats_interval;
+               timeout_ts.tv_nsec = 0;
+               wait_for_wakeup(&timeout_ts);
+       }
+}
diff --git a/input_stats.h b/input_stats.h
new file mode 100644 (file)
index 0000000..1149b7c
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef _INPUT_STATS_H
+#define _INPUT_STATS_H 1
+
+#include "thread.h"
+#include <string>
+#include <vector>
+
+// A thread that regularly writes out input statistics, ie. a list of all inputs
+// with some information about each. Very similar to StatsThread, but for inputs instead
+// of clients.
+
+class InputStatsThread : public Thread {
+public:
+       // Does not take ownership of the inputs.
+       InputStatsThread(const std::string &stats_file, int stats_interval, const std::vector<Input*> &inputs);
+
+private:
+       virtual void do_work();
+
+       std::string stats_file;
+       int stats_interval;
+       std::vector<Input*> inputs;
+};
+       
+#endif  // !defined(_INPUT_STATS_H)
index 8d60826..c40b78f 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -20,6 +20,7 @@
 #include "accesslog.h"
 #include "config.h"
 #include "input.h"
+#include "input_stats.h"
 #include "log.h"
 #include "markpool.h"
 #include "serverpool.h"
@@ -448,6 +449,18 @@ start:
                stats_thread->run();
        }
 
+       InputStatsThread *input_stats_thread = NULL;
+       if (!config.input_stats_file.empty()) {
+               vector<Input*> inputs_no_refcount;
+               for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+                    input_it != inputs.end(); ++input_it) {
+                       inputs_no_refcount.push_back(input_it->second.input);
+               }
+
+               input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount);
+               input_stats_thread->run();
+       }
+
        struct timeval server_start;
        gettimeofday(&server_start, NULL);
        if (state_fd != -1) {
@@ -466,6 +479,10 @@ start:
        // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
        gettimeofday(&serialize_start, NULL);
 
+       if (input_stats_thread != NULL) {
+               input_stats_thread->stop();
+               delete input_stats_thread;
+       }
        if (stats_thread != NULL) {
                stats_thread->stop();
                delete stats_thread;
index 93c56e3..d40acc9 100644 (file)
@@ -43,6 +43,8 @@ message InputProto {
        optional bytes pending_data = 7;
        optional bool has_metacube_header = 8;
        optional int32 sock = 9;
+       optional int64 bytes_received = 11;
+       optional int64 data_bytes_received = 12;
 };
 
 // Corresponds to class Acceptor.
index 048866a..e444bbc 100644 (file)
@@ -9,6 +9,7 @@
 
 #include "acceptor.h"
 #include "log.h"
+#include "mutexlock.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "udpinput.h"
@@ -29,6 +30,11 @@ UDPInput::UDPInput(const string &url)
        assert(ok);
 
        construct_header();
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = 0;
+       stats.data_bytes_received = 0;
 }
 
 UDPInput::UDPInput(const InputProto &serialized)
@@ -41,6 +47,11 @@ UDPInput::UDPInput(const InputProto &serialized)
        assert(ok);
 
        construct_header();
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = serialized.bytes_received();
+       stats.data_bytes_received = serialized.data_bytes_received();
 }
 
 InputProto UDPInput::serialize() const
@@ -48,6 +59,8 @@ InputProto UDPInput::serialize() const
        InputProto serialized;
        serialized.set_url(url);
        serialized.set_sock(sock);
+       serialized.set_bytes_received(stats.bytes_received);
+       serialized.set_data_bytes_received(stats.data_bytes_received);
        return serialized;
 }
 
@@ -104,9 +117,21 @@ void UDPInput::do_work()
                        close_socket();
                        continue;
                }
+
+               {
+                       MutexLock lock(&stats_mutex);
+                       stats.bytes_received += ret;
+                       stats.data_bytes_received += ret;
+               }
                
                for (size_t i = 0; i < stream_indices.size(); ++i) {
                        servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
                }
        }
 }
+
+InputStats UDPInput::get_stats() const
+{
+       MutexLock lock(&stats_mutex);
+       return stats;
+}
index 0ee5dc9..caff008 100644 (file)
@@ -21,6 +21,8 @@ public:
 
        virtual void add_destination(int stream_index);
 
+       virtual InputStats get_stats() const;
+
 private:
        // Actually gets the packets.
        virtual void do_work();
@@ -42,6 +44,12 @@ private:
 
        // Temporary buffer, sized for the maximum size of an UDP packet.
        char packet_buf[65536];
+
+       // Mutex protecting <stats>.
+       mutable pthread_mutex_t stats_mutex;
+
+       // The current statistics for this connection. Protected by <stats_mutex>.
+       InputStats stats;
 };
 
 #endif  // !defined(_UDPINPUT_H)