+#include <fcntl.h>
+#include <stddef.h>
#include <stdio.h>
-#include <string.h>
#include <stdlib.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <pthread.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <vector>
+#include "client.h"
+#include "log.h"
#include "serverpool.h"
#include "stats.h"
+#include "util.h"
using namespace std;
extern ServerPool *servers;
-StatsThread::StatsThread(const std::string &stats_file, int stats_interval)
+StatsThread::StatsThread(const string &stats_file, int stats_interval)
: stats_file(stats_file),
stats_interval(stats_interval)
{
}
-void StatsThread::run()
-{
- should_stop = false;
- pthread_create(&worker_thread, NULL, do_work_thunk, this);
-}
-
-void StatsThread::stop()
-{
- should_stop = true;
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- perror("pthread_join");
- exit(1);
- }
-}
-
-void *StatsThread::do_work_thunk(void *arg)
-{
- StatsThread *stats_thread = reinterpret_cast<StatsThread *>(arg);
- stats_thread->do_work();
- return NULL;
-}
-
void StatsThread::do_work()
{
- while (!should_stop) {
+ while (!should_stop()) {
int fd;
+ char *filename;
FILE *fp;
- time_t now;
+ timespec now;
vector<ClientStats> client_stats;
+ vector<HLSZombie> hls_zombies;
+ unordered_map<string, HLSZombie> remaining_hls_zombies;
+
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ goto sleep;
+ }
// Open a new, temporary file.
- char *filename = strdup((stats_file + ".new.XXXXXX").c_str());
+ filename = strdup((stats_file + ".new.XXXXXX").c_str());
fd = mkostemp(filename, O_WRONLY);
if (fd == -1) {
- perror(filename);
+ log_perror(filename);
free(filename);
goto sleep;
}
fp = fdopen(fd, "w");
- if (fp == NULL) {
- perror("fdopen");
- close(fd);
- unlink(filename);
+ if (fp == nullptr) {
+ log_perror("fdopen");
+ safe_close(fd);
+ if (unlink(filename) == -1) {
+ log_perror(filename);
+ }
free(filename);
goto sleep;
}
- now = time(NULL);
- client_stats = servers->get_client_stats();
- for (size_t i = 0; i < client_stats.size(); ++i) {
- fprintf(fp, "%s %s %d %llu\n",
- client_stats[i].remote_addr.c_str(),
- client_stats[i].stream_id.c_str(),
- int(now - client_stats[i].connect_time),
- (long long unsigned)(client_stats[i].bytes_sent));
+ // Get all the HLS zombies and combine them into one map (we resolve conflicts
+ // by having an arbitrary element win; in practice, that means the lowest
+ // server ID).
+ for (HLSZombie &zombie : servers->get_hls_zombies()) {
+ const string remote_addr = zombie.remote_addr;
+ remaining_hls_zombies[move(remote_addr)] = move(zombie);
+ }
+
+ // Remove all zombies whose ID match an already ongoing request.
+ // (Normally, this is cleared out already when it starts,
+ // but the request could happen on a different server from the zombie,
+ // or the zombie could be deserialized.)
+ for (const ClientStats &stats : servers->get_client_stats()) {
+ if (stats.url != "-") {
+ remaining_hls_zombies.erase(stats.hls_zombie_key);
+ }
+ }
+
+ for (const ClientStats &stats : servers->get_client_stats()) {
+ string url = stats.url;
+ if (url == "-") {
+ // No download going on currently; could it be waiting for more HLS fragments?
+ auto it = remaining_hls_zombies.find(stats.remote_addr);
+ if (it != remaining_hls_zombies.end()) {
+ url = it->second.url;
+ remaining_hls_zombies.erase(it);
+ }
+ }
+
+ fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+ stats.remote_addr.c_str(),
+ stats.sock,
+ 0, // Used to be fwmark.
+ url.c_str(),
+ int(now.tv_sec - stats.connect_time.tv_sec), // Rather coarse.
+ (long long unsigned)(stats.bytes_sent),
+ (long long unsigned)(stats.bytes_lost),
+ (long long unsigned)(stats.num_loss_events),
+ stats.referer.c_str(),
+ stats.user_agent.c_str());
+ }
+ for (const auto &url_and_zombie : remaining_hls_zombies) {
+ const HLSZombie &zombie = url_and_zombie.second;
+ fprintf(fp, "%s %d %d %s %d %llu %llu %llu \"%s\" \"%s\"\n",
+ zombie.remote_addr.c_str(),
+ 0, // Fake socket. (The Munin script doesn't like negative numbers.)
+ 0, // Used to be fwmark.
+ zombie.url.c_str(),
+ 0,
+ 0ULL,
+ 0ULL,
+ 0ULL,
+ zombie.referer.c_str(),
+ zombie.user_agent.c_str());
}
if (fclose(fp) == EOF) {
- perror("fclose");
- unlink(filename);
+ log_perror("fclose");
+ if (unlink(filename) == -1) {
+ log_perror(filename);
+ }
free(filename);
goto sleep;
}
if (rename(filename, stats_file.c_str()) == -1) {
- perror("rename");
- unlink(filename);
+ log_perror("rename");
+ if (unlink(filename) == -1) {
+ log_perror(filename);
+ }
}
+ free(filename);
sleep:
- int left_to_sleep = stats_interval;
- do {
- left_to_sleep = sleep(left_to_sleep);
- } while (left_to_sleep > 0 && !should_stop);
+ // Wait until we are asked to quit, stats_interval timeout,
+ // or a spurious signal. (The latter will cause us to write stats
+ // too often, but that's okay.)
+ timespec timeout_ts;
+ timeout_ts.tv_sec = stats_interval;
+ timeout_ts.tv_nsec = 0;
+ wait_for_wakeup(&timeout_ts);
}
}