types[name] = TYPE_HISTOGRAM;
}
+void Metrics::add(const string &name, const vector<pair<string, string>> &labels, Summary *location, Laziness laziness)
+{
+ Metric metric;
+ metric.data_type = DATA_TYPE_SUMMARY;
+ metric.laziness = laziness;
+ metric.location_summary = location;
+
+ lock_guard<mutex> lock(mu);
+ metrics.emplace(MetricKey(name, labels), metric);
+ assert(types.count(name) == 0 || types[name] == TYPE_SUMMARY);
+ types[name] = TYPE_SUMMARY;
+}
+
void Metrics::remove(const string &name, const vector<pair<string, string>> &labels)
{
lock_guard<mutex> lock(mu);
ss << "# TYPE nageru_" << type_it->first << " gauge\n";
} else if (type_it->second == TYPE_HISTOGRAM) {
ss << "# TYPE nageru_" << type_it->first << " histogram\n";
+ } else if (type_it->second == TYPE_SUMMARY) {
+ ss << "# TYPE nageru_" << type_it->first << " summary\n";
}
++type_it;
}
} else {
ss << name << " " << val << "\n";
}
- } else {
+ } else if (metric.data_type == DATA_TYPE_HISTOGRAM) {
ss << metric.location_histogram->serialize(metric.laziness, key_and_metric.first.name, key_and_metric.first.labels);
+ } else {
+ ss << metric.location_summary->serialize(metric.laziness, key_and_metric.first.name, key_and_metric.first.labels);
}
}
return ss.str();
}
+
+Summary::Summary(const vector<double> &quantiles, double window_seconds)
+ : quantiles(quantiles), window(window_seconds) {}
+
+void Summary::count_event(double val)
+{
+ steady_clock::time_point now = steady_clock::now();
+ steady_clock::time_point cutoff = now - duration_cast<steady_clock::duration>(window);
+
+ lock_guard<mutex> lock(mu);
+ values.emplace_back(now, val);
+ while (!values.empty() && values.front().first < cutoff) {
+ values.pop_front();
+ }
+
+ // Non-atomic add, but that's fine, since there are no concurrent writers.
+ sum = sum + val;
+ ++count;
+}
+
+string Summary::serialize(Metrics::Laziness laziness, const string &name, const vector<pair<string, string>> &labels)
+{
+ steady_clock::time_point now = steady_clock::now();
+ steady_clock::time_point cutoff = now - duration_cast<steady_clock::duration>(window);
+
+ vector<double> values_copy;
+ {
+ lock_guard<mutex> lock(mu);
+ while (!values.empty() && values.front().first < cutoff) {
+ values.pop_front();
+ }
+ values_copy.reserve(values.size());
+ for (const auto &time_and_value : values) {
+ values_copy.push_back(time_and_value.second);
+ }
+ }
+
+ vector<pair<double, double>> answers;
+ if (values_copy.size() == 0) {
+ if (laziness == Metrics::PRINT_WHEN_NONEMPTY) {
+ return "";
+ }
+ for (double quantile : quantiles) {
+ answers.emplace_back(quantile, 0.0 / 0.0);
+ }
+ } else if (values_copy.size() == 1) {
+ for (double quantile : quantiles) {
+ answers.emplace_back(quantile, values_copy[0]);
+ }
+ } else {
+ // We could probably do repeated nth_element, but the constant factor
+ // gets a bit high, so just sorting probably is about as fast.
+ sort(values_copy.begin(), values_copy.end());
+ for (double quantile : quantiles) {
+ double idx = quantile * (values_copy.size() - 1);
+ size_t idx_floor = size_t(floor(idx));
+ const double v0 = values_copy[idx_floor];
+
+ if (idx_floor == values_copy.size() - 1) {
+ answers.emplace_back(quantile, values_copy[idx_floor]);
+ } else {
+ // Linear interpolation.
+ double t = idx - idx_floor;
+ const double v1 = values_copy[idx_floor + 1];
+ answers.emplace_back(quantile, v0 + t * (v1 - v0));
+ }
+ }
+ }
+
+ stringstream ss;
+ ss.imbue(locale("C"));
+ ss.precision(20);
+
+ for (const auto &quantile_and_value : answers) {
+ stringstream quantile_ss;
+ quantile_ss.imbue(locale("C"));
+ quantile_ss.precision(3);
+ quantile_ss << quantile_and_value.first;
+ vector<pair<string, string>> quantile_labels = labels;
+ quantile_labels.emplace_back("quantile", quantile_ss.str());
+
+ ss << Metrics::serialize_name(name, quantile_labels) << " " << quantile_and_value.second << "\n";
+ }
+
+ ss << Metrics::serialize_name(name + "_sum", labels) << " " << sum.load() << "\n";
+ ss << Metrics::serialize_name(name + "_count", labels) << " " << count.load() << "\n";
+ return ss.str();
+}
// which makes it quite unwieldy. Thus, we'll package our own for the time being.
#include <atomic>
+#include <chrono>
+#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <string>
+#include <utility>
#include <vector>
class Histogram;
+class Summary;
// Prometheus recommends the use of timestamps instead of “time since event”,
// so you can use this to get the number of seconds since the epoch.
TYPE_COUNTER,
TYPE_GAUGE,
TYPE_HISTOGRAM, // Internal use only.
+ TYPE_SUMMARY, // Internal use only.
};
enum Laziness {
PRINT_ALWAYS,
add(name, {}, location);
}
+ void add(const std::string &name, Summary *location)
+ {
+ add(name, {}, location);
+ }
+
void add(const std::string &name, const std::vector<std::pair<std::string, std::string>> &labels, std::atomic<int64_t> *location, Type type = TYPE_COUNTER);
void add(const std::string &name, const std::vector<std::pair<std::string, std::string>> &labels, std::atomic<double> *location, Type type = TYPE_COUNTER);
void add(const std::string &name, const std::vector<std::pair<std::string, std::string>> &labels, Histogram *location, Laziness laziness = PRINT_ALWAYS);
+ void add(const std::string &name, const std::vector<std::pair<std::string, std::string>> &labels, Summary *location, Laziness laziness = PRINT_ALWAYS);
void remove(const std::string &name)
{
DATA_TYPE_INT64,
DATA_TYPE_DOUBLE,
DATA_TYPE_HISTOGRAM,
+ DATA_TYPE_SUMMARY,
};
struct MetricKey {
MetricKey(const std::string &name, const std::vector<std::pair<std::string, std::string>> labels)
std::atomic<int64_t> *location_int64;
std::atomic<double> *location_double;
Histogram *location_histogram;
+ Summary *location_summary;
};
};
mutable std::mutex mu;
std::map<std::string, Type> types; // Ordered the same as metrics.
std::map<MetricKey, Metric> metrics;
- std::vector<Histogram> histograms;
friend class Histogram;
+ friend class Summary;
};
class Histogram {
std::atomic<int64_t> count_after_last_bucket{0};
};
+// This is a pretty dumb streaming quantile class, but it's exact, and we don't have
+// too many values (typically one per frame, and one-minute interval), so we don't
+// need anything fancy.
+class Summary {
+public:
+ Summary(const std::vector<double> &quantiles, double window_seconds);
+ void count_event(double val);
+ std::string serialize(Metrics::Laziness laziness, const std::string &name, const std::vector<std::pair<std::string, std::string>> &labels);
+
+private:
+ const std::vector<double> quantiles;
+ const std::chrono::duration<double> window;
+
+ mutable std::mutex mu;
+ std::deque<std::pair<std::chrono::steady_clock::time_point, double>> values;
+ std::atomic<double> sum{0.0};
+ std::atomic<int64_t> count{0};
+};
+
extern Metrics global_metrics;
#endif // !defined(_METRICS_H)
void LatencyHistogram::init(const string &measuring_point)
{
unsigned num_cards = global_flags.num_cards; // The mixer might not be ready yet.
- histograms.resize(num_cards * FRAME_HISTORY_LENGTH * 2);
+ summaries.resize(num_cards * FRAME_HISTORY_LENGTH * 2);
for (unsigned card_index = 0; card_index < num_cards; ++card_index) {
char card_index_str[64];
snprintf(card_index_str, sizeof(card_index_str), "%u", card_index);
- histograms[card_index].resize(FRAME_HISTORY_LENGTH);
+ summaries[card_index].resize(FRAME_HISTORY_LENGTH);
for (unsigned frame_index = 0; frame_index < FRAME_HISTORY_LENGTH; ++frame_index) {
char frame_index_str[64];
snprintf(frame_index_str, sizeof(frame_index_str), "%u", frame_index);
- histograms[card_index][frame_index].reset(new Histogram[2]);
- histograms[card_index][frame_index][0].init_geometric(0.001, 10.0, 30);
- histograms[card_index][frame_index][1].init_geometric(0.001, 10.0, 30);
+ summaries[card_index][frame_index].reset(
+ new Summary[2]{{{0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99}, 60.0},
+ {{0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99}, 60.0}});
global_metrics.add("latency_seconds",
{{ "measuring_point", measuring_point },
{ "card", card_index_str },
{ "frame_age", frame_index_str },
{ "frame_type", "i/p" }},
- &histograms[card_index][frame_index][0],
+ &summaries[card_index][frame_index][0],
(frame_index == 0) ? Metrics::PRINT_ALWAYS : Metrics::PRINT_WHEN_NONEMPTY);
global_metrics.add("latency_seconds",
{{ "measuring_point", measuring_point },
{ "card", card_index_str },
{ "frame_age", frame_index_str },
{ "frame_type", "b" }},
- &histograms[card_index][frame_index][1],
+ &summaries[card_index][frame_index][1],
Metrics::PRINT_WHEN_NONEMPTY);
}
}
continue;
}
duration<double> latency = now - ts;
- histogram->histograms[card_index][frame_index][is_b_frame].count_event(latency.count());
+ histogram->summaries[card_index][frame_index][is_b_frame].count_event(latency.count());
}
}
void init(const std::string &measuring_point); // Initializes histograms and registers them in global_metrics.
// Indices: card number, frame history number, b-frame or not (1/0).
- std::vector<std::vector<std::unique_ptr<Histogram[]>>> histograms;
+ std::vector<std::vector<std::unique_ptr<Summary[]>>> summaries;
};
ReceivedTimestamps find_received_timestamp(const std::vector<RefCountedFrame> &input_frames);