num_servers 4 # one for each cpu
port 9094
+stats_file cubemap.stats
+stats_interval 60
+
#
# now the streams!
#
#include <signal.h>
#include <errno.h>
#include <ctype.h>
+#include <fcntl.h>
#include <vector>
#include <string>
#include <map>
return NULL;
}
+struct StatsThreadParameters {
+ string stats_file;
+ int stats_interval;
+};
+
+void *stats_thread_run(void *arg)
+{
+ const StatsThreadParameters *parms = reinterpret_cast<StatsThreadParameters *>(arg);
+ while (!hupped) {
+ int fd;
+ FILE *fp;
+ time_t now;
+ vector<ClientStats> client_stats;
+
+ // Open a new, temporary file.
+ char *filename = strdup((parms->stats_file + ".new.XXXXXX").c_str());
+ fd = mkostemp(filename, O_WRONLY);
+ if (fd == -1) {
+ perror(filename);
+ free(filename);
+ goto sleep;
+ }
+
+ fp = fdopen(fd, "w");
+ if (fp == NULL) {
+ perror("fdopen");
+ close(fd);
+ unlink(filename);
+ free(filename);
+ goto sleep;
+ }
+
+ now = time(NULL);
+ client_stats = servers->get_client_stats();
+ for (size_t i = 0; i < client_stats.size(); ++i) {
+ fprintf(fp, "%s %s %d %llu\n",
+ client_stats[i].remote_addr.c_str(),
+ client_stats[i].stream_id.c_str(),
+ int(now - client_stats[i].connect_time),
+ (long long unsigned)(client_stats[i].bytes_sent));
+ }
+ if (fclose(fp) == EOF) {
+ perror("fclose");
+ unlink(filename);
+ free(filename);
+ goto sleep;
+ }
+
+ if (rename(filename, parms->stats_file.c_str()) == -1) {
+ perror("rename");
+ unlink(filename);
+ }
+
+sleep:
+ int left_to_sleep = parms->stats_interval;
+ do {
+ left_to_sleep = sleep(left_to_sleep);
+ } while (left_to_sleep > 0);
+ }
+ return NULL;
+}
+
// Serialize the given state to a file descriptor, and return the (still open)
// descriptor.
int make_tempfile(const CubemapStateProto &state)
string config_filename = (argc == 1) ? "cubemap.config" : argv[1];
vector<ConfigLine> config = parse_config(config_filename);
- int port = fetch_config_int(config, "port", 1, 65535);
- int num_servers = fetch_config_int(config, "num_servers", 1, 20000); // Insanely high max limit.
+ int port = fetch_config_int(config, "port", 1, 65535, PARAMATER_MANDATORY);
+ int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY); // Insanely high max limit.
servers = new ServerPool(num_servers);
server_sock = create_server_socket(port);
}
+ // See if the user wants stats.
+ string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL);
+ int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1);
+ if (stats_interval != -1 && stats_file.empty()) {
+ fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n");
+ }
+
servers->run();
pthread_t acceptor_thread;
// All deserialized inputs should now have been taken care of, one way or the other.
assert(deserialized_inputs.empty());
+ // Start writing statistics.
+ pthread_t stats_thread;
+ StatsThreadParameters stats_parameters; // Must live for as long as the stats thread does.
+ if (!stats_file.empty()) {
+ stats_parameters.stats_file = stats_file;
+ stats_parameters.stats_interval = stats_interval;
+ pthread_create(&stats_thread, NULL, stats_thread_run, &stats_parameters);
+ }
+
signal(SIGHUP, hup);
while (!hupped) {
}
// OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
+ if (!stats_file.empty()) {
+ if (pthread_join(stats_thread, NULL) == -1) {
+ perror("pthread_join");
+ exit(1);
+ }
+ }
if (pthread_join(acceptor_thread, NULL) == -1) {
perror("pthread_join");
exit(1);
return ret;
}
-int fetch_config_int(const vector<ConfigLine> &config, const string &keyword, int min_limit, int max_limit)
+string fetch_config_string(const vector<ConfigLine> &config, const string &keyword,
+ ParameterType parameter_type, const string &default_value)
{
+ assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL);
+ for (unsigned i = 0; i < config.size(); ++i) {
+ if (config[i].keyword != keyword) {
+ continue;
+ }
+ if (config[i].parameters.size() > 0 ||
+ config[i].arguments.size() != 1) {
+ fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
+ exit(1);
+ }
+ return config[i].arguments[0];
+ }
+ if (parameter_type == PARAMATER_MANDATORY) {
+ fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n",
+ keyword.c_str());
+ exit(1);
+ } else {
+ return default_value;
+ }
+}
+
+int fetch_config_int(const std::vector<ConfigLine> &config, const std::string &keyword,
+ int min_limit, int max_limit,
+ ParameterType parameter_type, int default_value)
+{
+ assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL);
bool value_found = false;
int value = -1;
for (unsigned i = 0; i < config.size(); ++i) {
value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity.
}
if (!value_found) {
+ if (parameter_type == PARAMETER_OPTIONAL) {
+ return default_value;
+ }
fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n",
keyword.c_str());
exit(1);
// Parse the configuration file.
std::vector<ConfigLine> parse_config(const std::string &filename);
+enum ParameterType {
+ PARAMETER_OPTIONAL,
+ PARAMATER_MANDATORY,
+};
+
+std::string fetch_config_string(const std::vector<ConfigLine> &config, const std::string &keyword,
+ ParameterType parameter_type, const std::string &default_value = "");
+
// Note: Limits are inclusive.
-int fetch_config_int(const std::vector<ConfigLine> &config, const std::string &keyword, int min_limit, int max_limit);
+int fetch_config_int(const std::vector<ConfigLine> &config, const std::string &keyword,
+ int min_limit, int max_limit,
+ ParameterType parameter_type, int default_value = -1);
// Add the new data to an existing string, looking for \r\n\r\n
// (typical of HTTP requests and/or responses). Will return one
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/epoll.h>
+#include <time.h>
#include <errno.h>
#include <vector>
#include <string>
Client::Client(int sock)
: sock(sock),
+ connect_time(time(NULL)),
state(Client::READING_REQUEST),
stream(NULL),
header_or_error_bytes_sent(0),
bytes_sent(0)
{
request.reserve(1024);
+
+ // Find the remote address, and convert it to ASCII.
+ sockaddr_in6 addr;
+ socklen_t addr_len = sizeof(addr);
+
+ if (getpeername(sock, reinterpret_cast<sockaddr *>(&addr), &addr_len) == -1) {
+ perror("getpeername");
+ remote_addr = "";
+ } else {
+ char buf[INET6_ADDRSTRLEN];
+ if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) {
+ perror("inet_ntop");
+ remote_addr = "";
+ } else {
+ remote_addr = buf;
+ }
+ }
}
Client::Client(const ClientProto &serialized, Stream *stream)
: sock(serialized.sock()),
+ remote_addr(serialized.remote_addr()),
+ connect_time(serialized.connect_time()),
state(State(serialized.state())),
request(serialized.request()),
stream_id(serialized.stream_id()),
{
ClientProto serialized;
serialized.set_sock(sock);
+ serialized.set_remote_addr(remote_addr);
+ serialized.set_connect_time(connect_time);
serialized.set_state(state);
serialized.set_request(request);
serialized.set_stream_id(stream_id);
serialized.set_bytes_sent(bytes_sent);
return serialized;
}
+
+ClientStats Client::get_stats() const
+{
+ ClientStats stats;
+ stats.stream_id = stream_id;
+ stats.remote_addr = remote_addr;
+ stats.connect_time = connect_time;
+ stats.bytes_sent = bytes_sent;
+ return stats;
+}
Stream::Stream(const string &stream_id)
: stream_id(stream_id),
exit(1);
}
}
+
+vector<ClientStats> Server::get_client_stats() const
+{
+ vector<ClientStats> ret;
+
+ MutexLock lock(&mutex);
+ for (map<int, Client>::const_iterator client_it = clients.begin();
+ client_it != clients.end();
+ ++client_it) {
+ ret.push_back(client_it->second.get_stats());
+ }
+ return ret;
+}
void *Server::do_work_thunk(void *arg)
{
#include <stdint.h>
#include <pthread.h>
#include <sys/epoll.h>
+#include <time.h>
#include <string>
#include <map>
#include <vector>
class Stream;
class StreamProto;
+// Digested statistics for writing to logs etc.
+struct ClientStats {
+ std::string stream_id;
+ std::string remote_addr;
+ time_t connect_time;
+ size_t bytes_sent;
+};
+
struct Client {
Client() {}
Client(int sock);
Client(const ClientProto &serialized, Stream *stream);
ClientProto serialize() const;
+ ClientStats get_stats() const;
+
// The file descriptor associated with this socket.
int sock;
+ // Some information only used for logging.
+ std::string remote_addr;
+ time_t connect_time;
+
enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR };
State state;
// Stop the thread.
void stop();
+
+ // Get the list of all currently connected clients.
+ std::vector<ClientStats> get_client_stats() const;
// Set header (both HTTP header and any stream headers) for the given stream.
void set_header(const std::string &stream_id, const std::string &header);
// Mutex protecting queued_data only. Note that if you want to hold both this
// and <mutex> below, you will need to take <mutex> before this one.
- pthread_mutex_t queued_data_mutex;
+ mutable pthread_mutex_t queued_data_mutex;
// Deferred commands that should be run from the do_work() thread as soon as possible.
// We defer these for two reasons:
std::map<std::string, std::string> queued_data;
// All variables below this line are protected by the mutex.
- pthread_mutex_t mutex;
+ mutable pthread_mutex_t mutex;
// If the thread should stop or not.
bool should_stop;
servers[i].run();
}
}
+
+vector<ClientStats> ServerPool::get_client_stats() const
+{
+ vector<ClientStats> ret;
+ for (int i = 0; i < num_servers; ++i) {
+ vector<ClientStats> stats = servers[i].get_client_stats();
+ ret.insert(ret.end(), stats.begin(), stats.end());
+ }
+ return ret;
+}
#include "server.h"
+#include <vector>
+
// Provides services such as load-balancing between a number of Server instances.
class ServerPool {
public:
// Starts all the servers.
void run();
+ std::vector<ClientStats> get_client_stats() const;
+
private:
Server *servers;
int num_servers, clients_added;
// Corresponds to struct Client.
message ClientProto {
optional int32 sock = 1;
+ optional string remote_addr = 8;
+ optional int64 connect_time = 9;
optional int32 state = 2;
optional bytes request = 3;
optional string stream_id = 4;