CXXFLAGS=-Wall -O2 -g
LDLIBS=-lpthread -lprotobuf
-OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o thread.o util.o log.o state.pb.o
+OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
all: cubemap
--- /dev/null
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/poll.h>
+#include <errno.h>
+
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include "accesslog.h"
+#include "client.h"
+#include "log.h"
+#include "mutexlock.h"
+
+using namespace std;
+
+AccessLogThread::AccessLogThread() {}
+
+AccessLogThread::AccessLogThread(const string &filename)
+ : filename(filename) {}
+
+void AccessLogThread::write(const ClientStats& client)
+{
+ MutexLock lock(&mutex);
+ pending_writes.push_back(client);
+}
+
+void AccessLogThread::do_work()
+{
+ // Open the file.
+ if (filename.empty()) {
+ logfp = NULL;
+ } else {
+ logfp = fopen(filename.c_str(), "a+");
+ if (logfp == NULL) {
+ log_perror(filename.c_str());
+ // Continue as before.
+ }
+ }
+
+ while (!should_stop) {
+ // Empty the queue.
+ vector<ClientStats> writes;
+ {
+ MutexLock lock(&mutex);
+ swap(pending_writes, writes);
+ }
+
+ if (logfp == NULL) {
+ continue;
+ }
+
+ // Do the actual writes.
+ time_t now = time(NULL);
+ for (size_t i = 0; i < writes.size(); ++i) {
+ fprintf(logfp, "%llu %s %s %d %llu %llu %llu\n",
+ (long long unsigned)(writes[i].connect_time),
+ writes[i].remote_addr.c_str(),
+ writes[i].stream_id.c_str(),
+ int(now - writes[i].connect_time),
+ (long long unsigned)(writes[i].bytes_sent),
+ (long long unsigned)(writes[i].bytes_lost),
+ (long long unsigned)(writes[i].num_loss_events));
+ }
+ fflush(logfp);
+
+ // Wait until the stop_fd pipe is closed, one second has passed.
+ // or a spurious signal arrives.
+ pollfd pfd;
+ pfd.fd = stop_fd_read;
+ pfd.events = POLLIN | POLLRDHUP;
+
+ int nfds = poll(&pfd, 1, 1000);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ continue;
+ }
+ if (nfds == 1) {
+ // Should stop.
+ break;
+ }
+ if (nfds == -1) {
+ log_perror("poll");
+ usleep(100000);
+ continue;
+ }
+ }
+
+ if (fclose(logfp) == EOF) {
+ log_perror("fclose");
+ }
+
+ logfp = NULL;
+}
--- /dev/null
+#ifndef _ACCESSLOG_H
+#define _ACCESSLOG_H
+
+// A class to log clients that just disconnected. Since this is shared by all
+// Server instances, we try not to let write() block too much, and rather do
+// all the I/O in a separate I/O thread.
+
+#include <stdio.h>
+
+#include <string>
+#include <vector>
+
+#include "client.h"
+#include "thread.h"
+
+class AccessLogThread : public Thread {
+public:
+ // Used if we do not have a file to log to. The thread will still exist,
+ // but won't actually write anywhere.
+ AccessLogThread();
+
+ // Log to a given file. If the file can't be opened, log an error
+ // to the error log, and work as if we didn't have a log file.
+ AccessLogThread(const std::string &filename);
+
+ // Add a log entry. Entries are written out at least once every second.
+ void write(const ClientStats& client);
+
+private:
+ virtual void do_work();
+
+ // The file we are logging to. If NULL, do not log.
+ FILE *logfp;
+ std::string filename;
+
+ pthread_mutex_t mutex;
+ std::vector<ClientStats> pending_writes;
+};
+
+#endif // _ACCESSLOG_H
ClientStats Client::get_stats() const
{
ClientStats stats;
- stats.stream_id = stream_id;
+ if (stream_id.empty()) {
+ stats.stream_id = "-";
+ } else {
+ stats.stream_id = stream_id;
+ }
stats.sock = sock;
stats.fwmark = fwmark;
stats.remote_addr = remote_addr;
if (has_stats_interval && !has_stats_file) {
log(WARNING, "'stats_interval' given, but no 'stats_file'. No statistics will be written.");
}
+
+ fetch_config_string(lines, "access_log", &config->access_log_file);
for (size_t i = 0; i < lines.size(); ++i) {
const ConfigLine &line = lines[i];
if (line.keyword == "num_servers" ||
line.keyword == "stats_file" ||
- line.keyword == "stats_interval") {
+ line.keyword == "stats_interval" ||
+ line.keyword == "access_log") {
// Already taken care of, above.
} else if (line.keyword == "port") {
if (!parse_port(line, config)) {
std::string stats_file; // Empty means no stats file.
int stats_interval;
+
+ std::string access_log_file; // Empty means no accses_log file.
};
// Parse and validate configuration. Returns false on error.
stats_file cubemap.stats
stats_interval 60
+# Logging of clients as they disconnect (and as such as no longer visible in the stats file).
+# You can only have zero or one of these.
+access_log access.log
+
# Logging of various informational and error messages. You can have as many of these as you want.
error_log type=file filename=cubemap.log
error_log type=syslog
#include <utility>
#include <vector>
+#include "accesslog.h"
#include "acceptor.h"
#include "config.h"
#include "input.h"
using namespace std;
+AccessLogThread *access_log = NULL;
ServerPool *servers = NULL;
volatile bool hupped = false;
volatile bool stopped = false;
open_logs(config.log_destinations);
log(INFO, "Cubemap " SERVER_VERSION " starting.");
+ if (config.access_log_file.empty()) {
+ // Create a dummy logger.
+ access_log = new AccessLogThread();
+ } else {
+ access_log = new AccessLogThread(config.access_log_file);
+ }
+ access_log->run();
+
servers = new ServerPool(config.num_servers);
CubemapStateProto loaded_state;
}
}
delete servers;
+ access_log->stop();
+ delete access_log;
shut_down_logging();
if (stopped) {
#include <utility>
#include <vector>
+#include "accesslog.h"
#include "log.h"
#include "markpool.h"
#include "mutexlock.h"
using namespace std;
+extern AccessLogThread *access_log;
+
Server::Server()
{
pthread_mutex_init(&mutex, NULL);
}
}
+ // Log to access_log.
+ access_log->write(client->get_stats());
+
// Bye-bye!
int ret;
do {