CXXFLAGS=-Wall -O2 -g
LDLIBS=-lpthread -lprotobuf
-OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o thread.o util.o state.pb.o
+OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o thread.o util.o log.o state.pb.o
all: cubemap
#include <vector>
#include "config.h"
+#include "log.h"
#include "parse.h"
using namespace std;
}
if (config[i].parameters.size() > 0 ||
config[i].arguments.size() != 1) {
- fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
+ log(ERROR, "'%s' takes one argument and no parameters", keyword.c_str());
return false;
}
*value = config[i].arguments[0];
}
if (config[i].parameters.size() > 0 ||
config[i].arguments.size() != 1) {
- fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
+ log(ERROR, "'%s' takes one argument and no parameters", keyword.c_str());
return false;
}
*value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity.
bool parse_port(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 1) {
- fprintf(stderr, "ERROR: 'port' takes exactly one argument\n");
+ log(ERROR, "'port' takes exactly one argument");
return false;
}
AcceptorConfig acceptor;
acceptor.port = atoi(line.arguments[0].c_str());
if (acceptor.port < 1 || acceptor.port >= 65536) {
- fprintf(stderr, "ERROR: port %d is out of range (must be [1,65536>).\n", acceptor.port);
+ log(ERROR, "port %d is out of range (must be [1,65536>).", acceptor.port);
return false;
}
pool_index = i;
} else if ((from >= pool.from && from < pool.to) ||
(to >= pool.from && to < pool.to)) {
- fprintf(stderr, "WARNING: Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks.\n",
- from, to, pool.from, pool.to);
- fprintf(stderr, " Mark pools must either be completely disjunct, or completely overlapping.\n");
+ log(WARNING, "Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks."
+ "Mark pools must either be completely disjunct, or completely overlapping.",
+ from, to, pool.from, pool.to);
}
}
{
size_t split = mark_str.find_first_of('-');
if (split == string::npos) {
- fprintf(stderr, "ERROR: Invalid mark specification '%s' (expected 'X-Y').\n",
+ log(ERROR, "Invalid mark specification '%s' (expected 'X-Y').",
mark_str.c_str());
return false;
}
*to = atoi(to_str.c_str());
if (*from <= 0 || *from >= 65536 || *to <= 0 || *to >= 65536) {
- fprintf(stderr, "ERROR: Mark pool range %d-%d is outside legal range [1,65536>.\n",
+ log(ERROR, "Mark pool range %d-%d is outside legal range [1,65536>.",
*from, *to);
return false;
}
bool parse_stream(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 1) {
- fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n");
+ log(ERROR, "'stream' takes exactly one argument");
return false;
}
map<string, string>::const_iterator src_it = line.parameters.find("src");
if (src_it == line.parameters.end()) {
- fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n",
+ log(WARNING, "stream '%s' has no src= attribute, clients will not get any data.",
stream.stream_id.c_str());
} else {
stream.src = src_it->second;
return true;
}
+bool parse_error_log(const ConfigLine &line, Config *config)
+{
+ if (line.arguments.size() != 0) {
+ log(ERROR, "'error_log' takes no arguments (only parameters type= and filename=)");
+ return false;
+ }
+
+ LogConfig log_config;
+ map<string, string>::const_iterator type_it = line.parameters.find("type");
+ if (type_it == line.parameters.end()) {
+ log(ERROR, "'error_log' has no type= parameter");
+ return false;
+ }
+
+ string type = type_it->second;
+ if (type == "file") {
+ log_config.type = LogConfig::LOG_TYPE_FILE;
+ } else if (type == "syslog") {
+ log_config.type = LogConfig::LOG_TYPE_SYSLOG;
+ } else if (type == "console") {
+ log_config.type = LogConfig::LOG_TYPE_CONSOLE;
+ } else {
+ log(ERROR, "Unknown log type '%s'", type.c_str());
+ return false;
+ }
+
+ if (log_config.type == LogConfig::LOG_TYPE_FILE) {
+ map<string, string>::const_iterator filename_it = line.parameters.find("filename");
+ if (filename_it == line.parameters.end()) {
+ log(ERROR, "error_log type 'file' with no filename= parameter");
+ return false;
+ }
+ log_config.filename = filename_it->second;
+ }
+
+ config->log_destinations.push_back(log_config);
+ return true;
+}
+
bool parse_config(const string &filename, Config *config)
{
vector<ConfigLine> lines;
}
if (!fetch_config_int(lines, "num_servers", &config->num_servers)) {
- fprintf(stderr, "ERROR: Missing 'num_servers' statement in config file.\n");
+ log(ERROR, "Missing 'num_servers' statement in config file.");
return false;
}
if (config->num_servers < 1 || config->num_servers >= 20000) { // Insanely high max limit.
- fprintf(stderr, "ERROR: 'num_servers' is %d, needs to be in [1, 20000>.\n", config->num_servers);
+ log(ERROR, "'num_servers' is %d, needs to be in [1, 20000>.", config->num_servers);
return false;
}
bool has_stats_file = fetch_config_string(lines, "stats_file", &config->stats_file);
bool has_stats_interval = fetch_config_int(lines, "stats_interval", &config->stats_interval);
if (has_stats_interval && !has_stats_file) {
- fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n");
+ log(WARNING, "'stats_interval' given, but no 'stats_file'. No statistics will be written.");
}
for (size_t i = 0; i < lines.size(); ++i) {
if (!parse_stream(line, config)) {
return false;
}
+ } else if (line.keyword == "error_log") {
+ if (!parse_error_log(line, config)) {
+ return false;
+ }
} else {
- fprintf(stderr, "ERROR: Unknown configuration keyword '%s'.\n",
+ log(ERROR, "Unknown configuration keyword '%s'.",
line.keyword.c_str());
return false;
}
int port;
};
+struct LogConfig {
+ enum { LOG_TYPE_FILE, LOG_TYPE_CONSOLE, LOG_TYPE_SYSLOG } type;
+ std::string filename;
+};
+
struct Config {
int num_servers;
std::vector<MarkPoolConfig> mark_pools;
std::vector<StreamConfig> streams;
std::vector<AcceptorConfig> acceptors;
+ std::vector<LogConfig> log_destinations;
std::string stats_file; // Empty means no stats file.
int stats_interval;
#include <vector>
#include "httpinput.h"
+#include "log.h"
#include "metacube.h"
#include "parse.h"
#include "serverpool.h"
addrinfo *ai;
int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
if (err == -1) {
- fprintf(stderr, "WARNING: Lookup of '%s' failed (%s).\n",
+ log(WARNING, "Lookup of '%s' failed (%s).",
host.c_str(), gai_strerror(err));
freeaddrinfo(ai);
return -1;
}
// Give the last one as error.
- fprintf(stderr, "WARNING: Connect to '%s' failed (%s)\n",
+ log(WARNING, "Connect to '%s' failed (%s)",
host.c_str(), strerror(errno));
freeaddrinfo(ai);
return -1;
{
vector<string> lines = split_lines(response);
if (lines.empty()) {
- fprintf(stderr, "WARNING: Empty HTTP response from input.\n");
+ log(WARNING, "Empty HTTP response from input.");
return false;
}
vector<string> first_line_tokens = split_tokens(lines[0]);
if (first_line_tokens.size() < 2) {
- fprintf(stderr, "WARNING: Malformed response line '%s' from input.\n",
+ log(WARNING, "Malformed response line '%s' from input.",
lines[0].c_str());
return false;
}
int response = atoi(first_line_tokens[1].c_str());
if (response != 200) {
- fprintf(stderr, "WARNING: Non-200 response '%s' from input.\n",
+ log(WARNING, "Non-200 response '%s' from input.",
lines[0].c_str());
return false;
}
for (size_t i = 1; i < lines.size(); ++i) {
size_t split = lines[i].find(":");
if (split == string::npos) {
- fprintf(stderr, "WARNING: Ignoring malformed HTTP response line '%s'\n",
+ log(WARNING, "Ignoring malformed HTTP response line '%s'",
lines[i].c_str());
continue;
}
{
string protocol; // Thrown away.
if (!parse_url(url, &protocol, &host, &port, &path)) {
- fprintf(stderr, "Failed to parse URL '%s'\n", url.c_str());
+ log(WARNING, "Failed to parse URL '%s'", url.c_str());
break;
}
}
if (ret == 0) {
// This really shouldn't happen...
- fprintf(stderr, "Socket unexpectedly closed while reading header\n");
+ log(ERROR, "Socket unexpectedly closed while reading header");
state = CLOSING_SOCKET;
continue;
}
RequestParseStatus status = wait_for_double_newline(&response, buf, ret);
if (status == RP_OUT_OF_SPACE) {
- fprintf(stderr, "WARNING: fd %d sent overlong response!\n", sock);
+ log(WARNING, "fd %d sent overlong response!", sock);
state = CLOSING_SOCKET;
continue;
} else if (status == RP_NOT_FINISHED_YET) {
if (ret == 0) {
// This really shouldn't happen...
- fprintf(stderr, "Socket unexpectedly closed while reading header\n");
+ log(ERROR, "Socket unexpectedly closed while reading header");
state = CLOSING_SOCKET;
continue;
}
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
if (state == NOT_CONNECTED && !should_stop) {
- fprintf(stderr, "Waiting 0.2 second and restarting...\n");
+ log(INFO, "Waiting 0.2 second and restarting...");
usleep(200000);
}
}
if (num_bytes == 0) {
return;
}
- fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n",
+ log(WARNING, "Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?",
(long long)num_bytes);
pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
}
--- /dev/null
+#include "log.h"
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <syslog.h>
+#include <assert.h>
+#include <string>
+#include <vector>
+
+using namespace std;
+
+// Yes, it's a bit ugly.
+#define SYSLOG_FAKE_FILE (static_cast<FILE *>(NULL))
+
+bool logging_started = false;
+std::vector<FILE *> log_destinations;
+
+void add_log_destination_file(const std::string &filename)
+{
+ FILE *fp = fopen(filename.c_str(), "a");
+ if (fp == NULL) {
+ perror(filename.c_str());
+ return;
+ }
+
+ log_destinations.push_back(fp);
+}
+
+void add_log_destination_console()
+{
+ log_destinations.push_back(stderr);
+}
+
+void add_log_destination_syslog()
+{
+ openlog("cubemap", LOG_PID, LOG_DAEMON);
+ log_destinations.push_back(SYSLOG_FAKE_FILE);
+}
+
+void start_logging()
+{
+ logging_started = true;
+}
+
+void shut_down_logging()
+{
+ for (size_t i = 0; i < log_destinations.size(); ++i) {
+ if (log_destinations[i] == SYSLOG_FAKE_FILE) {
+ closelog();
+ } else if (log_destinations[i] != stderr) {
+ if (fclose(log_destinations[i]) != 0) {
+ perror("fclose");
+ }
+ }
+ }
+ log_destinations.clear();
+ logging_started = false;
+}
+
+void log(LogLevel log_level, const char *fmt, ...)
+{
+ char formatted_msg[4096];
+ va_list ap;
+ va_start(ap, fmt);
+ vsnprintf(formatted_msg, sizeof(formatted_msg), fmt, ap);
+ va_end(ap);
+
+ const char *log_level_str;
+ int syslog_level;
+
+ switch (log_level) {
+ case NO_LEVEL:
+ log_level_str = "";
+ syslog_level = LOG_INFO;
+ break;
+ case INFO:
+ log_level_str = "INFO: ";
+ syslog_level = LOG_INFO;
+ break;
+ case WARNING:
+ log_level_str = "WARNING: ";
+ syslog_level = LOG_WARNING;
+ break;
+ case ERROR:
+ log_level_str = "ERROR: ";
+ syslog_level = LOG_ERR;
+ break;
+ default:
+ assert(false);
+ }
+
+ // Log to stderr if logging hasn't been set up yet. Note that this means
+ // that such messages will come even if there are no “error_log” lines.
+ if (!logging_started) {
+ fprintf(stderr, "%s%s\n", log_level_str, formatted_msg);
+ return;
+ }
+
+ for (size_t i = 0; i < log_destinations.size(); ++i) {
+ if (log_destinations[i] == SYSLOG_FAKE_FILE) {
+ syslog(syslog_level, "%s", formatted_msg);
+ } else {
+ int err = fprintf(log_destinations[i], "%s%s\n", log_level_str, formatted_msg);
+ if (err < 0) {
+ perror("fprintf");
+ }
+ if (log_destinations[i] != stderr) {
+ fflush(log_destinations[i]);
+ }
+ }
+ }
+}
--- /dev/null
+#ifndef _LOG_H
+#define _LOG_H 1
+
+// Functions for common logging to file and syslog.
+
+#include <string>
+
+enum LogLevel {
+ NO_LEVEL,
+ INFO,
+ WARNING,
+ ERROR,
+};
+
+void add_log_destination_file(const std::string &filename);
+void add_log_destination_console();
+void add_log_destination_syslog();
+
+void start_logging();
+void shut_down_logging();
+
+void log(LogLevel log_level, const char *fmt, ...);
+
+#endif // !defined(_LOG_H)
#include "acceptor.h"
#include "config.h"
#include "input.h"
+#include "log.h"
#include "markpool.h"
#include "serverpool.h"
#include "state.pb.h"
if (deserialized_input_it != deserialized_inputs->end()) {
input = deserialized_input_it->second;
if (input->get_url() != src) {
- fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n",
+ log(INFO, "Stream '%s' has changed URL from '%s' to '%s', restarting input.",
stream_id.c_str(), input->get_url().c_str(), src.c_str());
input->close_socket();
delete input;
if (input == NULL) {
input = create_input(stream_id, src);
if (input == NULL) {
- fprintf(stderr, "ERROR: did not understand URL '%s', clients will not get any data.\n",
+ log(ERROR, "did not understand URL '%s', clients will not get any data.",
src.c_str());
continue;
}
stream_it != expecting_stream_ids.end();
++stream_it) {
string stream_id = *stream_it;
- fprintf(stderr, "WARNING: stream '%s' disappeared from the configuration file.\n",
- stream_id.c_str());
- fprintf(stderr, " It will not be deleted, but clients will not get any new inputs.\n");
+ log(WARNING, "stream '%s' disappeared from the configuration file. "
+ "It will not be deleted, but clients will not get any new inputs.",
+ stream_id.c_str());
if (deserialized_inputs->count(stream_id) != 0) {
delete (*deserialized_inputs)[stream_id];
deserialized_inputs->erase(stream_id);
}
}
+void open_logs(const vector<LogConfig> &log_destinations)
+{
+ for (size_t i = 0; i < log_destinations.size(); ++i) {
+ if (log_destinations[i].type == LogConfig::LOG_TYPE_FILE) {
+ add_log_destination_file(log_destinations[i].filename);
+ } else if (log_destinations[i].type == LogConfig::LOG_TYPE_CONSOLE) {
+ add_log_destination_console();
+ } else if (log_destinations[i].type == LogConfig::LOG_TYPE_SYSLOG) {
+ add_log_destination_syslog();
+ } else {
+ assert(false);
+ }
+ }
+ start_logging();
+}
+
bool dry_run_config(const std::string &argv0, const std::string &config_filename)
{
char *argv0_copy = strdup(argv0.c_str());
}
start:
- fprintf(stderr, "\nCubemap " SERVER_VERSION " starting.\n");
+ // Open logs as soon as possible.
+ open_logs(config.log_destinations);
+
+ log(NO_LEVEL, "Cubemap " SERVER_VERSION " starting.");
servers = new ServerPool(config.num_servers);
CubemapStateProto loaded_state;
map<string, Input *> deserialized_inputs;
map<int, Acceptor *> deserialized_acceptors;
if (state_fd != -1) {
- fprintf(stderr, "Deserializing state from previous process... ");
+ log(INFO, "Deserializing state from previous process...");
string serialized;
if (!read_tempfile(state_fd, &serialized)) {
exit(1);
}
if (!loaded_state.ParseFromString(serialized)) {
- fprintf(stderr, "ERROR: Failed deserialization of state.\n");
+ log(ERROR, "Failed deserialization of state.");
exit(1);
}
new Acceptor(loaded_state.acceptors(i))));
}
- fprintf(stderr, "done.\n");
+ log(INFO, "Deserialization done.");
}
// Find all streams in the configuration file, and create them.
// “glitch” period was, not counting of course reconnects if the configuration changed.
double glitch_time = server_start.tv_sec - serialize_start.tv_sec +
1e-6 * (server_start.tv_usec - serialize_start.tv_usec);
- fprintf(stderr, "Re-exec happened in approx. %.0f ms.\n", glitch_time * 1000.0);
+ log(INFO, "Re-exec happened in approx. %.0f ms.", glitch_time * 1000.0);
}
while (!hupped) {
}
servers->stop();
- fprintf(stderr, "Serializing state and re-execing...\n");
+ log(INFO, "Serializing state and re-execing...");
CubemapStateProto state = collect_state(
serialize_start, acceptors, inputs, servers);
string serialized;
exit(1);
}
delete servers;
+ shut_down_logging();
if (!dry_run_config(argv[0], config_filename)) {
- fprintf(stderr, "ERROR: %s --test-config failed. Restarting old version instead of new.\n", argv[0]);
+ open_logs(config.log_destinations);
+ log(ERROR, "%s --test-config failed. Restarting old version instead of new.", argv[0]);
hupped = false;
+ shut_down_logging();
goto start;
}
+
char buf[16];
sprintf(buf, "%d", state_fd);
for ( ;; ) {
execlp(argv[0], argv[0], config_filename.c_str(), "--state", buf, NULL);
+ open_logs(config.log_destinations);
perror("execlp");
- fprintf(stderr, "PANIC: re-exec of %s failed. Waiting 0.2 seconds and trying again...\n", argv[0]);
+ log(ERROR, "re-exec of %s failed. Waiting 0.2 seconds and trying again...", argv[0]);
+ shut_down_logging();
usleep(200000);
}
}
+#include "log.h"
#include "markpool.h"
#include "mutexlock.h"
#include <stdio.h>
{
MutexLock lock(&mutex);
if (free_marks.empty()) {
- fprintf(stderr, "WARNING: Out of free marks in mark pool %d-%d, session will not be marked.\n",
- start, end);
- fprintf(stderr, " To fix, increase the pool size and HUP the server.\n");
+ log(WARNING, "Out of free marks in mark pool %d-%d, session will not be marked. "
+ "To fix, increase the pool size and HUP the server.",
+ start, end);
return 0;
}
int mark = free_marks.front();
#include <utility>
#include <vector>
+#include "log.h"
#include "markpool.h"
#include "mutexlock.h"
#include "parse.h"
switch (status) {
case RP_OUT_OF_SPACE:
- fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock);
+ log(WARNING, "fd %d sent overlong request!", client->sock);
close_client(client);
return;
case RP_NOT_FINISHED_YET:
// See if there's more data for us.
goto read_request_again;
case RP_EXTRA_DATA:
- fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock);
+ log(WARNING, "fd %d had junk data after request!", client->sock);
close_client(client);
return;
case RP_FINISHED:
return;
}
if (bytes_to_send > stream->backlog_size) {
- fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n",
+ log(WARNING, "fd %d lost %lld bytes, maybe too slow connection",
client->sock,
(long long int)(bytes_to_send - stream->backlog_size));
client->stream_pos = stream->bytes_received - stream->backlog_size;
#include <string>
#include "acceptor.h"
+#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
#include "udpinput.h"
int port_num = atoi(port.c_str());
sock = create_server_socket(port_num, UDP_SOCKET);
if (sock == -1) {
- fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n");
+ log(WARNING, "UDP socket creation failed. Waiting 0.2 seconds and trying again...");
usleep(200000);
continue;
}
#include <stdlib.h>
#include <unistd.h>
+#include "log.h"
#include "util.h"
using namespace std;
goto done;
}
if (ret == 0) {
- fprintf(stderr, "Unexpected EOF!\n");
+ log(ERROR, "Unexpected EOF!");
ok = false;
goto done;
}