CXXFLAGS=-Wall -O2 -g -pthread
LDLIBS=-lprotobuf -pthread
-OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
+OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
all: cubemap
bool has_stats_file = fetch_config_string(lines, "stats_file", &config->stats_file);
bool has_stats_interval = fetch_config_int(lines, "stats_interval", &config->stats_interval);
if (has_stats_interval && !has_stats_file) {
- log(WARNING, "'stats_interval' given, but no 'stats_file'. No statistics will be written.");
+ log(WARNING, "'stats_interval' given, but no 'stats_file'. No client statistics will be written.");
+ }
+
+ config->input_stats_interval = 60;
+ bool has_input_stats_file = fetch_config_string(lines, "input_stats_file", &config->input_stats_file);
+ bool has_input_stats_interval = fetch_config_int(lines, "input_stats_interval", &config->input_stats_interval);
+ if (has_input_stats_interval && !has_input_stats_file) {
+ log(WARNING, "'input_stats_interval' given, but no 'input_stats_file'. No input statistics will be written.");
}
fetch_config_string(lines, "access_log", &config->access_log_file);
if (line.keyword == "num_servers" ||
line.keyword == "stats_file" ||
line.keyword == "stats_interval" ||
+ line.keyword == "input_stats_file" ||
+ line.keyword == "input_stats_interval" ||
line.keyword == "access_log") {
// Already taken care of, above.
} else if (line.keyword == "port") {
std::string stats_file; // Empty means no stats file.
int stats_interval;
+ std::string input_stats_file; // Empty means no input stats file.
+ int input_stats_interval;
+
std::string access_log_file; // Empty means no accses_log file.
};
stats_file cubemap.stats
stats_interval 60
+input_stats_file cubemap-input.stats
+input_stats_interval 60
+
# Logging of clients as they disconnect (and as such as no longer visible in the stats file).
# You can only have zero or one of these.
access_log access.log
#include "httpinput.h"
#include "log.h"
#include "metacube.h"
+#include "mutexlock.h"
#include "parse.h"
#include "serverpool.h"
#include "state.pb.h"
has_metacube_header(false),
sock(-1)
{
+ pthread_mutex_init(&stats_mutex, NULL);
+ stats.url = url;
+ stats.bytes_received = 0;
+ stats.data_bytes_received = 0;
}
HTTPInput::HTTPInput(const InputProto &serialized)
has_metacube_header(serialized.has_metacube_header()),
sock(serialized.sock())
{
+ pthread_mutex_init(&stats_mutex, NULL);
+
pending_data.resize(serialized.pending_data().size());
memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size());
memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) {
http_header.resize(http_header.size() - 2);
}
+
+ pthread_mutex_init(&stats_mutex, NULL);
+ stats.url = url;
+ stats.bytes_received = serialized.bytes_received();
+ stats.data_bytes_received = serialized.data_bytes_received();
}
void HTTPInput::close_socket()
serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
serialized.set_has_metacube_header(has_metacube_header);
serialized.set_sock(sock);
+ serialized.set_bytes_received(stats.bytes_received);
+ serialized.set_data_bytes_received(stats.data_bytes_received);
return serialized;
}
void HTTPInput::process_data(char *ptr, size_t bytes)
{
pending_data.insert(pending_data.end(), ptr, ptr + bytes);
+ {
+ MutexLock mutex(&stats_mutex);
+ stats.bytes_received += bytes;
+ }
for ( ;; ) {
// If we don't have enough data (yet) for even the Metacube header, just return.
return;
}
- // Send this block on to the data.
+ // Send this block on to the servers.
+ {
+ MutexLock lock(&stats_mutex);
+ stats.data_bytes_received += size;
+ }
char *inner_data = pending_data.data() + sizeof(metacube_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
string header(inner_data, inner_data + size);
pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
}
+InputStats HTTPInput::get_stats() const
+{
+ MutexLock lock(&stats_mutex);
+ return stats;
+}
stream_indices.push_back(stream_index);
}
+ virtual InputStats get_stats() const;
+
private:
// Actually does the download.
virtual void do_work();
bool has_metacube_header;
// The socket we are downloading on (or -1).
- int sock;
+ int sock;
+
+ // Mutex protecting <stats>.
+ mutable pthread_mutex_t stats_mutex;
+
+ // The current statistics for this connection. Protected by <stats_mutex>.
+ InputStats stats;
};
#endif // !defined(_HTTPINPUT_H)
Input *create_input(const std::string &url);
Input *create_input(const InputProto &serialized);
+// Digested statistics for writing to logs etc.
+struct InputStats {
+ std::string url;
+
+ // The number of bytes we have received so far, including any Metacube headers.
+ //
+ // Not reset across connections.
+ size_t bytes_received;
+
+ // The number of data bytes we have received so far (or more precisely,
+ // number of data bytes we have sent on to the stream). This excludes Metacube
+ // headers and corrupted data we've skipped.
+ //
+ // Not reset across connections.
+ size_t data_bytes_received;
+
+ // TODO: Number of loss events and connection time might both be useful,
+ // similar to for clients. Also, per-connection byte counters.
+};
+
class Input : public Thread {
public:
virtual ~Input();
virtual std::string get_url() const = 0;
virtual void close_socket() = 0;
virtual void add_destination(int stream_index) = 0;
+
+ // Note: May be called from a different thread, so must be thread-safe.
+ virtual InputStats get_stats() const = 0;
};
#endif // !defined(_INPUT_H)
--- /dev/null
+#include <fcntl.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <vector>
+
+#include "client.h"
+#include "log.h"
+#include "input.h"
+#include "input_stats.h"
+#include "util.h"
+
+using namespace std;
+
+InputStatsThread::InputStatsThread(const string &stats_file, int stats_interval, const vector<Input*> &inputs)
+ : stats_file(stats_file),
+ stats_interval(stats_interval),
+ inputs(inputs)
+{
+}
+
+void InputStatsThread::do_work()
+{
+ while (!should_stop()) {
+ int fd;
+ FILE *fp;
+ vector<ClientStats> client_stats;
+
+ // Open a new, temporary file.
+ char *filename = strdup((stats_file + ".new.XXXXXX").c_str());
+ fd = mkostemp(filename, O_WRONLY);
+ if (fd == -1) {
+ log_perror(filename);
+ free(filename);
+ goto sleep;
+ }
+
+ fp = fdopen(fd, "w");
+ if (fp == NULL) {
+ log_perror("fdopen");
+ safe_close(fd);
+ if (unlink(filename) == -1) {
+ log_perror(filename);
+ }
+ free(filename);
+ goto sleep;
+ }
+
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ InputStats stats = inputs[i]->get_stats();
+ fprintf(fp, "%s %llu %llu\n", stats.url.c_str(),
+ (long long unsigned)(stats.bytes_received),
+ (long long unsigned)(stats.data_bytes_received));
+ }
+ if (fclose(fp) == EOF) {
+ log_perror("fclose");
+ if (unlink(filename) == -1) {
+ log_perror(filename);
+ }
+ free(filename);
+ goto sleep;
+ }
+
+ if (rename(filename, stats_file.c_str()) == -1) {
+ log_perror("rename");
+ if (unlink(filename) == -1) {
+ log_perror(filename);
+ }
+ }
+ free(filename);
+
+sleep:
+ // Wait until we are asked to quit, stats_interval timeout,
+ // or a spurious signal. (The latter will cause us to write stats
+ // too often, but that's okay.)
+ timespec timeout_ts;
+ timeout_ts.tv_sec = stats_interval;
+ timeout_ts.tv_nsec = 0;
+ wait_for_wakeup(&timeout_ts);
+ }
+}
--- /dev/null
+#ifndef _INPUT_STATS_H
+#define _INPUT_STATS_H 1
+
+#include "thread.h"
+#include <string>
+#include <vector>
+
+// A thread that regularly writes out input statistics, ie. a list of all inputs
+// with some information about each. Very similar to StatsThread, but for inputs instead
+// of clients.
+
+class InputStatsThread : public Thread {
+public:
+ // Does not take ownership of the inputs.
+ InputStatsThread(const std::string &stats_file, int stats_interval, const std::vector<Input*> &inputs);
+
+private:
+ virtual void do_work();
+
+ std::string stats_file;
+ int stats_interval;
+ std::vector<Input*> inputs;
+};
+
+#endif // !defined(_INPUT_STATS_H)
#include "accesslog.h"
#include "config.h"
#include "input.h"
+#include "input_stats.h"
#include "log.h"
#include "markpool.h"
#include "serverpool.h"
stats_thread->run();
}
+ InputStatsThread *input_stats_thread = NULL;
+ if (!config.input_stats_file.empty()) {
+ vector<Input*> inputs_no_refcount;
+ for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+ input_it != inputs.end(); ++input_it) {
+ inputs_no_refcount.push_back(input_it->second.input);
+ }
+
+ input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount);
+ input_stats_thread->run();
+ }
+
struct timeval server_start;
gettimeofday(&server_start, NULL);
if (state_fd != -1) {
// OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
gettimeofday(&serialize_start, NULL);
+ if (input_stats_thread != NULL) {
+ input_stats_thread->stop();
+ delete input_stats_thread;
+ }
if (stats_thread != NULL) {
stats_thread->stop();
delete stats_thread;
optional bytes pending_data = 7;
optional bool has_metacube_header = 8;
optional int32 sock = 9;
+ optional int64 bytes_received = 11;
+ optional int64 data_bytes_received = 12;
};
// Corresponds to class Acceptor.
#include "acceptor.h"
#include "log.h"
+#include "mutexlock.h"
#include "serverpool.h"
#include "state.pb.h"
#include "udpinput.h"
assert(ok);
construct_header();
+
+ pthread_mutex_init(&stats_mutex, NULL);
+ stats.url = url;
+ stats.bytes_received = 0;
+ stats.data_bytes_received = 0;
}
UDPInput::UDPInput(const InputProto &serialized)
assert(ok);
construct_header();
+
+ pthread_mutex_init(&stats_mutex, NULL);
+ stats.url = url;
+ stats.bytes_received = serialized.bytes_received();
+ stats.data_bytes_received = serialized.data_bytes_received();
}
InputProto UDPInput::serialize() const
InputProto serialized;
serialized.set_url(url);
serialized.set_sock(sock);
+ serialized.set_bytes_received(stats.bytes_received);
+ serialized.set_data_bytes_received(stats.data_bytes_received);
return serialized;
}
close_socket();
continue;
}
+
+ {
+ MutexLock lock(&stats_mutex);
+ stats.bytes_received += ret;
+ stats.data_bytes_received += ret;
+ }
for (size_t i = 0; i < stream_indices.size(); ++i) {
servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
}
}
}
+
+InputStats UDPInput::get_stats() const
+{
+ MutexLock lock(&stats_mutex);
+ return stats;
+}
virtual void add_destination(int stream_index);
+ virtual InputStats get_stats() const;
+
private:
// Actually gets the packets.
virtual void do_work();
// Temporary buffer, sized for the maximum size of an UDP packet.
char packet_buf[65536];
+
+ // Mutex protecting <stats>.
+ mutable pthread_mutex_t stats_mutex;
+
+ // The current statistics for this connection. Protected by <stats_mutex>.
+ InputStats stats;
};
#endif // !defined(_UDPINPUT_H)