From: Steinar H. Gunderson Date: Tue, 9 Apr 2013 20:17:16 +0000 (+0200) Subject: Support writing a stats file listing the number of clients currently connected. X-Git-Tag: 1.0.0~166 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=019b96a9cc6fa2902690e98a2aa033517efef3ed Support writing a stats file listing the number of clients currently connected. --- diff --git a/cubemap.config.sample b/cubemap.config.sample index 66fcef6..fdd6a06 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -1,6 +1,9 @@ num_servers 4 # one for each cpu port 9094 +stats_file cubemap.stats +stats_interval 60 + # # now the streams! # diff --git a/main.cpp b/main.cpp index 1bcc07b..3bf660a 100644 --- a/main.cpp +++ b/main.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -128,6 +129,68 @@ void *acceptor_thread_run(void *arg) return NULL; } +struct StatsThreadParameters { + string stats_file; + int stats_interval; +}; + +void *stats_thread_run(void *arg) +{ + const StatsThreadParameters *parms = reinterpret_cast(arg); + while (!hupped) { + int fd; + FILE *fp; + time_t now; + vector client_stats; + + // Open a new, temporary file. + char *filename = strdup((parms->stats_file + ".new.XXXXXX").c_str()); + fd = mkostemp(filename, O_WRONLY); + if (fd == -1) { + perror(filename); + free(filename); + goto sleep; + } + + fp = fdopen(fd, "w"); + if (fp == NULL) { + perror("fdopen"); + close(fd); + unlink(filename); + free(filename); + goto sleep; + } + + now = time(NULL); + client_stats = servers->get_client_stats(); + for (size_t i = 0; i < client_stats.size(); ++i) { + fprintf(fp, "%s %s %d %llu\n", + client_stats[i].remote_addr.c_str(), + client_stats[i].stream_id.c_str(), + int(now - client_stats[i].connect_time), + (long long unsigned)(client_stats[i].bytes_sent)); + } + if (fclose(fp) == EOF) { + perror("fclose"); + unlink(filename); + free(filename); + goto sleep; + } + + if (rename(filename, parms->stats_file.c_str()) == -1) { + perror("rename"); + unlink(filename); + } + +sleep: + int left_to_sleep = parms->stats_interval; + do { + left_to_sleep = sleep(left_to_sleep); + } while (left_to_sleep > 0); + } + return NULL; +} + // Serialize the given state to a file descriptor, and return the (still open) // descriptor. int make_tempfile(const CubemapStateProto &state) @@ -201,8 +264,8 @@ int main(int argc, char **argv) string config_filename = (argc == 1) ? "cubemap.config" : argv[1]; vector config = parse_config(config_filename); - int port = fetch_config_int(config, "port", 1, 65535); - int num_servers = fetch_config_int(config, "num_servers", 1, 20000); // Insanely high max limit. + int port = fetch_config_int(config, "port", 1, 65535, PARAMATER_MANDATORY); + int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY); // Insanely high max limit. servers = new ServerPool(num_servers); @@ -282,6 +345,13 @@ int main(int argc, char **argv) server_sock = create_server_socket(port); } + // See if the user wants stats. + string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL); + int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1); + if (stats_interval != -1 && stats_file.empty()) { + fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n"); + } + servers->run(); pthread_t acceptor_thread; @@ -324,6 +394,15 @@ int main(int argc, char **argv) // All deserialized inputs should now have been taken care of, one way or the other. assert(deserialized_inputs.empty()); + // Start writing statistics. + pthread_t stats_thread; + StatsThreadParameters stats_parameters; // Must live for as long as the stats thread does. + if (!stats_file.empty()) { + stats_parameters.stats_file = stats_file; + stats_parameters.stats_interval = stats_interval; + pthread_create(&stats_thread, NULL, stats_thread_run, &stats_parameters); + } + signal(SIGHUP, hup); while (!hupped) { @@ -331,6 +410,12 @@ int main(int argc, char **argv) } // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. + if (!stats_file.empty()) { + if (pthread_join(stats_thread, NULL) == -1) { + perror("pthread_join"); + exit(1); + } + } if (pthread_join(acceptor_thread, NULL) == -1) { perror("pthread_join"); exit(1); diff --git a/parse.cpp b/parse.cpp index fca90a6..d8706d3 100644 --- a/parse.cpp +++ b/parse.cpp @@ -113,8 +113,35 @@ vector parse_config(const string &filename) return ret; } -int fetch_config_int(const vector &config, const string &keyword, int min_limit, int max_limit) +string fetch_config_string(const vector &config, const string &keyword, + ParameterType parameter_type, const string &default_value) { + assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); + for (unsigned i = 0; i < config.size(); ++i) { + if (config[i].keyword != keyword) { + continue; + } + if (config[i].parameters.size() > 0 || + config[i].arguments.size() != 1) { + fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); + exit(1); + } + return config[i].arguments[0]; + } + if (parameter_type == PARAMATER_MANDATORY) { + fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", + keyword.c_str()); + exit(1); + } else { + return default_value; + } +} + +int fetch_config_int(const std::vector &config, const std::string &keyword, + int min_limit, int max_limit, + ParameterType parameter_type, int default_value) +{ + assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); bool value_found = false; int value = -1; for (unsigned i = 0; i < config.size(); ++i) { @@ -130,6 +157,9 @@ int fetch_config_int(const vector &config, const string &keyword, in value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. } if (!value_found) { + if (parameter_type == PARAMETER_OPTIONAL) { + return default_value; + } fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", keyword.c_str()); exit(1); diff --git a/parse.h b/parse.h index 5d56bd1..3cfa091 100644 --- a/parse.h +++ b/parse.h @@ -22,8 +22,18 @@ std::vector split_lines(const std::string &str); // Parse the configuration file. std::vector parse_config(const std::string &filename); +enum ParameterType { + PARAMETER_OPTIONAL, + PARAMATER_MANDATORY, +}; + +std::string fetch_config_string(const std::vector &config, const std::string &keyword, + ParameterType parameter_type, const std::string &default_value = ""); + // Note: Limits are inclusive. -int fetch_config_int(const std::vector &config, const std::string &keyword, int min_limit, int max_limit); +int fetch_config_int(const std::vector &config, const std::string &keyword, + int min_limit, int max_limit, + ParameterType parameter_type, int default_value = -1); // Add the new data to an existing string, looking for \r\n\r\n // (typical of HTTP requests and/or responses). Will return one diff --git a/server.cpp b/server.cpp index e91ba39..5c55636 100644 --- a/server.cpp +++ b/server.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -24,16 +25,36 @@ using namespace std; Client::Client(int sock) : sock(sock), + connect_time(time(NULL)), state(Client::READING_REQUEST), stream(NULL), header_or_error_bytes_sent(0), bytes_sent(0) { request.reserve(1024); + + // Find the remote address, and convert it to ASCII. + sockaddr_in6 addr; + socklen_t addr_len = sizeof(addr); + + if (getpeername(sock, reinterpret_cast(&addr), &addr_len) == -1) { + perror("getpeername"); + remote_addr = ""; + } else { + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) { + perror("inet_ntop"); + remote_addr = ""; + } else { + remote_addr = buf; + } + } } Client::Client(const ClientProto &serialized, Stream *stream) : sock(serialized.sock()), + remote_addr(serialized.remote_addr()), + connect_time(serialized.connect_time()), state(State(serialized.state())), request(serialized.request()), stream_id(serialized.stream_id()), @@ -48,6 +69,8 @@ ClientProto Client::serialize() const { ClientProto serialized; serialized.set_sock(sock); + serialized.set_remote_addr(remote_addr); + serialized.set_connect_time(connect_time); serialized.set_state(state); serialized.set_request(request); serialized.set_stream_id(stream_id); @@ -56,6 +79,16 @@ ClientProto Client::serialize() const serialized.set_bytes_sent(bytes_sent); return serialized; } + +ClientStats Client::get_stats() const +{ + ClientStats stats; + stats.stream_id = stream_id; + stats.remote_addr = remote_addr; + stats.connect_time = connect_time; + stats.bytes_sent = bytes_sent; + return stats; +} Stream::Stream(const string &stream_id) : stream_id(stream_id), @@ -152,6 +185,19 @@ void Server::stop() exit(1); } } + +vector Server::get_client_stats() const +{ + vector ret; + + MutexLock lock(&mutex); + for (map::const_iterator client_it = clients.begin(); + client_it != clients.end(); + ++client_it) { + ret.push_back(client_it->second.get_stats()); + } + return ret; +} void *Server::do_work_thunk(void *arg) { diff --git a/server.h b/server.h index 5ebc060..300ec0d 100644 --- a/server.h +++ b/server.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,14 @@ class CubemapStateProto; class Stream; class StreamProto; +// Digested statistics for writing to logs etc. +struct ClientStats { + std::string stream_id; + std::string remote_addr; + time_t connect_time; + size_t bytes_sent; +}; + struct Client { Client() {} Client(int sock); @@ -26,9 +35,15 @@ struct Client { Client(const ClientProto &serialized, Stream *stream); ClientProto serialize() const; + ClientStats get_stats() const; + // The file descriptor associated with this socket. int sock; + // Some information only used for logging. + std::string remote_addr; + time_t connect_time; + enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR }; State state; @@ -105,6 +120,9 @@ public: // Stop the thread. void stop(); + + // Get the list of all currently connected clients. + std::vector get_client_stats() const; // Set header (both HTTP header and any stream headers) for the given stream. void set_header(const std::string &stream_id, const std::string &header); @@ -128,7 +146,7 @@ private: // Mutex protecting queued_data only. Note that if you want to hold both this // and below, you will need to take before this one. - pthread_mutex_t queued_data_mutex; + mutable pthread_mutex_t queued_data_mutex; // Deferred commands that should be run from the do_work() thread as soon as possible. // We defer these for two reasons: @@ -144,7 +162,7 @@ private: std::map queued_data; // All variables below this line are protected by the mutex. - pthread_mutex_t mutex; + mutable pthread_mutex_t mutex; // If the thread should stop or not. bool should_stop; diff --git a/serverpool.cpp b/serverpool.cpp index a203594..dc7506d 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -58,3 +58,13 @@ void ServerPool::run() servers[i].run(); } } + +vector ServerPool::get_client_stats() const +{ + vector ret; + for (int i = 0; i < num_servers; ++i) { + vector stats = servers[i].get_client_stats(); + ret.insert(ret.end(), stats.begin(), stats.end()); + } + return ret; +} diff --git a/serverpool.h b/serverpool.h index 74d6cb9..3083986 100644 --- a/serverpool.h +++ b/serverpool.h @@ -3,6 +3,8 @@ #include "server.h" +#include + // Provides services such as load-balancing between a number of Server instances. class ServerPool { public: @@ -28,6 +30,8 @@ public: // Starts all the servers. void run(); + std::vector get_client_stats() const; + private: Server *servers; int num_servers, clients_added; diff --git a/state.proto b/state.proto index 3d8ea99..663e9d0 100644 --- a/state.proto +++ b/state.proto @@ -1,6 +1,8 @@ // Corresponds to struct Client. message ClientProto { optional int32 sock = 1; + optional string remote_addr = 8; + optional int64 connect_time = 9; optional int32 state = 2; optional bytes request = 3; optional string stream_id = 4;