-#include <errno.h>
#include <fcntl.h>
-#include <poll.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include "log.h"
#include "serverpool.h"
#include "stats.h"
+#include "util.h"
using namespace std;
extern ServerPool *servers;
-StatsThread::StatsThread(const std::string &stats_file, int stats_interval)
+StatsThread::StatsThread(const string &stats_file, int stats_interval)
: stats_file(stats_file),
stats_interval(stats_interval)
{
void StatsThread::do_work()
{
- while (!should_stop) {
+ while (!should_stop()) {
int fd;
+ char *filename;
FILE *fp;
- time_t now;
+ timespec now;
vector<ClientStats> client_stats;
+ vector<HLSZombie> hls_zombies;
+ unordered_map<string, HLSZombie> remaining_hls_zombies;
+
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ goto sleep;
+ }
// Open a new, temporary file.
- char *filename = strdup((stats_file + ".new.XXXXXX").c_str());
- fd = mkostemp(filename, O_WRONLY);
+ filename = strdup((stats_file + ".new.XXXXXX").c_str());
+ fd = mkostemp(filename, O_WRONLY | O_CLOEXEC);
if (fd == -1) {
log_perror(filename);
free(filename);
}
fp = fdopen(fd, "w");
- if (fp == NULL) {
+ if (fp == nullptr) {
log_perror("fdopen");
- if (close(fd) == -1) {
- log_perror("close");
- }
+ safe_close(fd);
if (unlink(filename) == -1) {
log_perror(filename);
}
goto sleep;
}
- now = time(NULL);
- client_stats = servers->get_client_stats();
- for (size_t i = 0; i < client_stats.size(); ++i) {
- fprintf(fp, "%s %d %d %s %d %llu %llu %llu\n",
- client_stats[i].remote_addr.c_str(),
- client_stats[i].sock,
- client_stats[i].fwmark,
- client_stats[i].stream_id.c_str(),
- int(now - client_stats[i].connect_time),
- (long long unsigned)(client_stats[i].bytes_sent),
- (long long unsigned)(client_stats[i].bytes_lost),
- (long long unsigned)(client_stats[i].num_loss_events));
+ // Get all the HLS zombies and combine them into one map (we resolve conflicts
+ // by having an arbitrary element win; in practice, that means the lowest
+ // server ID).
+ for (HLSZombie &zombie : servers->get_hls_zombies()) {
+ const string remote_addr = zombie.remote_addr;
+ remaining_hls_zombies[move(remote_addr)] = move(zombie);
+ }
+
+ // Remove all zombies whose ID match an already ongoing request.
+ // (Normally, this is cleared out already when it starts,
+ // but the request could happen on a different server from the zombie,
+ // or the zombie could be deserialized.)
+ for (const ClientStats &stats : servers->get_client_stats()) {
+ if (stats.url != "-") {
+ remaining_hls_zombies.erase(stats.hls_zombie_key);
+ }
+ }
+
+ for (const ClientStats &stats : servers->get_client_stats()) {
+ string url = stats.url;
+ if (url == "-") {
+ // No download going on currently; could it be waiting for more HLS fragments?
+ auto it = remaining_hls_zombies.find(stats.remote_addr);
+ if (it != remaining_hls_zombies.end()) {
+ url = it->second.url;
+ remaining_hls_zombies.erase(it);
+ }
+ }
+
+ fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+ stats.remote_addr.c_str(),
+ stats.sock,
+ 0, // Used to be fwmark.
+ url.c_str(),
+ int(now.tv_sec - stats.connect_time.tv_sec), // Rather coarse.
+ (long long unsigned)(stats.bytes_sent),
+ (long long unsigned)(stats.bytes_lost),
+ (long long unsigned)(stats.num_loss_events),
+ stats.referer.c_str(),
+ stats.user_agent.c_str());
+ }
+ for (const auto &url_and_zombie : remaining_hls_zombies) {
+ const HLSZombie &zombie = url_and_zombie.second;
+ fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+ zombie.remote_addr.c_str(),
+ 0, // Fake socket. (The Munin script doesn't like negative numbers.)
+ 0, // Used to be fwmark.
+ zombie.url.c_str(),
+ 0,
+ 0ULL,
+ 0ULL,
+ 0ULL,
+ zombie.referer.c_str(),
+ zombie.user_agent.c_str());
}
if (fclose(fp) == EOF) {
log_perror("fclose");
free(filename);
sleep:
- // Wait until the stop_fd pipe is closed, stats_interval timeout,
+ // Wait until we are asked to quit, stats_interval timeout,
// or a spurious signal. (The latter will cause us to write stats
// too often, but that's okay.)
- pollfd pfd;
- pfd.fd = stop_fd_read;
- pfd.events = POLLIN | POLLRDHUP;
-
- int nfds = poll(&pfd, 1, stats_interval * 1000);
- if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
- continue;
- }
- if (nfds == 1) {
- // Should stop.
- break;
- }
- if (nfds == -1) {
- log_perror("poll");
- usleep(100000);
- continue;
- }
+ timespec timeout_ts;
+ timeout_ts.tv_sec = stats_interval;
+ timeout_ts.tv_nsec = 0;
+ wait_for_wakeup(&timeout_ts);
}
}