--- /dev/null
+/*
+ * A program to simulate various queue-drop strategies, using real frame
+ * arrival data as input. Contains various anchors, as well as parametrized
+ * values of the real algorithms that have been used in Nageru over time.
+ *
+ * Expects a log of frame arrivals (in and out). This isn't included in the
+ * git repository because it's quite large, but there's one available
+ * in compressed form at
+ *
+ * https://storage.sesse.net/nageru-latency-log.txt.xz
+ *
+ * The data set in question contains a rather difficult case, with two 50 Hz
+ * clocks slowly drifting from each other (at the rate of about a frame an hour).
+ * This means they are very nearly in sync for a long time, where rare bursts
+ * of jitter can make it hard for the algorithm to find the right level of
+ * conservatism.
+ *
+ * This is not meant to be production-quality code.
+ */
+
+#include <assert.h>
+#include <getopt.h>
+#include <math.h>
+#include <stdio.h>
+#include <locale.h>
+#include <string.h>
+#include <stdlib.h>
+#include <algorithm>
+#include <vector>
+#include <deque>
+#include <memory>
+#include <string>
+#include <limits>
+
+using namespace std;
+
+size_t max_drops = numeric_limits<size_t>::max();
+size_t max_underruns = numeric_limits<size_t>::max();
+double max_latency_ms = numeric_limits<double>::max();
+
+struct Event {
+ enum { IN, OUT } direction;
+ double t;
+};
+
+class Queue {
+public:
+ void add_frame(double t);
+ void get_frame(double now);
+ void drop_frame();
+ void eval(const string &name);
+ size_t queue_len() const { return frames_in_queue.size(); }
+ bool should_abort() const { return num_underruns > max_underruns || num_drops > max_drops; }
+
+private:
+ deque<double> frames_in_queue;
+ size_t num_underruns = 0;
+ size_t num_drops = 0;
+ size_t frames_since_underrun = 0;
+ size_t num_drops_on_first = 0;
+
+ double latency_sum = 0.0;
+ size_t latency_count = 0;
+};
+
+void Queue::add_frame(double t)
+{
+ frames_in_queue.push_back(t);
+}
+
+void Queue::get_frame(double now)
+{
+ if (frames_in_queue.empty()) {
+ ++num_underruns;
+ frames_since_underrun = 0;
+ return;
+ }
+ double t = frames_in_queue.front();
+ frames_in_queue.pop_front();
+ assert(now >= t);
+ latency_sum += (now - t);
+ ++latency_count;
+ ++frames_since_underrun;
+}
+
+void Queue::drop_frame()
+{
+ assert(!frames_in_queue.empty());
+ frames_in_queue.pop_front();
+ ++num_drops;
+ if (frames_since_underrun <= 1) {
+ ++num_drops_on_first;
+ }
+}
+
+void Queue::eval(const string &name)
+{
+ double latency_ms = 1e3 * latency_sum / latency_count;
+ if (num_underruns > max_underruns) return;
+ if (num_drops > max_drops) return;
+ if (latency_ms > max_latency_ms) return;
+ printf("%-50s: %2lu frames left in queue at end, %5lu underruns, %5lu drops (%5lu immediate), %6.2f ms avg latency\n",
+ name.c_str(), frames_in_queue.size(), num_underruns, num_drops, num_drops_on_first, latency_ms);
+}
+
+// A strategy that never drops; low anchor for drops and underruns, high anchor for latency.
+void test_nodrop(const vector<Event> &events)
+{
+ Queue q;
+ for (const Event &event : events) {
+ if (event.direction == Event::IN) {
+ q.add_frame(event.t);
+ } else {
+ q.get_frame(event.t);
+ }
+ }
+ q.eval("no-drop");
+}
+
+// A strategy that accepts only one element in the queue; low anchor for latency.
+void test_limit_to_1(const vector<Event> &events)
+{
+ Queue q;
+ for (const Event &event : events) {
+ if (event.direction == Event::IN) {
+ q.add_frame(event.t);
+ while (q.queue_len() > 1) q.drop_frame();
+ } else {
+ q.get_frame(event.t);
+ }
+ }
+ q.eval("limit-to-1");
+}
+
+// A strategy that accepts one or two elements in the queue.
+void test_limit_to_2(const vector<Event> &events)
+{
+ Queue q;
+ for (const Event &event : events) {
+ if (event.direction == Event::IN) {
+ q.add_frame(event.t);
+ while (q.queue_len() > 2) q.drop_frame();
+ } else {
+ q.get_frame(event.t);
+ }
+ }
+ q.eval("limit-to-2");
+}
+
+// The algorithm used from Nageru 1.2.0 to 1.6.0; raise the ceiling by 1 every time
+// we underrun, drop it if the ceiling hasn't been needed for 1000 frames.
+void test_nageru_1_2_0(const vector<Event> &events)
+{
+ Queue q;
+ unsigned safe_queue_length = 1;
+ unsigned frames_with_at_least_one = 0;
+ bool been_at_safe_point_since_last_starvation = false;
+ for (const Event &event : events) {
+ if (event.direction == Event::IN) {
+ q.add_frame(event.t);
+ } else {
+ unsigned queue_length = q.queue_len();
+ if (queue_length == 0) { // Starvation.
+ if (been_at_safe_point_since_last_starvation /*&& safe_queue_length < unsigned(global_flags.max_input_queue_frames)*/) {
+ ++safe_queue_length;
+ }
+ frames_with_at_least_one = 0;
+ been_at_safe_point_since_last_starvation = false;
+ q.get_frame(event.t); // mark it
+ continue;
+ }
+ if (queue_length >= safe_queue_length) {
+ been_at_safe_point_since_last_starvation = true;
+ }
+ if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) {
+ --safe_queue_length;
+ frames_with_at_least_one = 0;
+ }
+ while (q.queue_len() > safe_queue_length) {
+ q.drop_frame();
+ }
+ q.get_frame(event.t);
+ }
+ }
+ q.eval("nageru-1.2.0");
+}
+
+class Jitter {
+ const double multiplier, alpha;
+ double expected_timestamp = -1.0;
+ double max_jitter_seconds = 0.0;
+
+public:
+ Jitter(double multiplier, double alpha)
+ : multiplier(multiplier), alpha(alpha) {}
+
+ void update(double timestamp, double frame_duration, size_t dropped_frames)
+ {
+ if (expected_timestamp >= 0.0) {
+ expected_timestamp += dropped_frames * frame_duration;
+ double jitter_seconds = fabs(expected_timestamp - timestamp);
+ max_jitter_seconds = max(multiplier * jitter_seconds, alpha * max_jitter_seconds); // About two seconds half-time.
+
+ // Cap at 100 ms.
+ max_jitter_seconds = min(max_jitter_seconds, 0.1);
+ }
+ expected_timestamp = timestamp + frame_duration;
+ }
+
+ double get_expected() const
+ {
+ return expected_timestamp;
+ }
+
+ double get_jitter() const
+ {
+ return max_jitter_seconds;
+ }
+};
+
+// Keep a running estimate of k times max jitter seen, decreasing by a factor alpha every frame.
+void test_jitter_filter(const vector<Event> &events, double multiplier, double alpha, double margin)
+{
+ Queue q;
+ Jitter input_jitter(multiplier, alpha);
+ Jitter output_jitter(multiplier, alpha);
+
+ for (const Event &event : events) {
+ if (event.direction == Event::IN) {
+ input_jitter.update(event.t, 0.020, 0);
+ q.add_frame(event.t);
+ } else {
+ double now = event.t;
+ output_jitter.update(event.t, 0.020, 0);
+ q.get_frame(event.t);
+
+ double seconds_until_next_frame = max(input_jitter.get_expected() - now + input_jitter.get_jitter(), 0.0);
+ double master_frame_length_seconds = 0.020;
+
+ seconds_until_next_frame += margin; // Hack.
+
+ size_t safe_queue_length = max<int>(floor((seconds_until_next_frame + output_jitter.get_jitter()) / master_frame_length_seconds), 0);
+ while (q.queue_len() > safe_queue_length) {
+ q.drop_frame();
+ }
+ }
+ if (q.should_abort()) return;
+ }
+
+ char name[256];
+ snprintf(name, sizeof(name), "jitter-filter[mul=%.1f,alpha=%.4f,margin=%.1f]", multiplier, alpha, 1e3 * margin);
+ q.eval(name);
+}
+
+// Implements an unbalanced binary search tree that can also satisfy order queries
+// (e.g. “give me the 86th largest entry”).
+class HistoryJitter {
+ const size_t history_length;
+ const double multiplier, percentile;
+ double expected_timestamp = 0.0;
+ double max_jitter_seconds = 0.0;
+ size_t num_updates = 0;
+
+ deque<double> history;
+ struct TreeNode {
+ double val;
+ size_t children = 0;
+ unique_ptr<TreeNode> left, right;
+ };
+ unique_ptr<TreeNode> root;
+
+ unique_ptr<TreeNode> alloc_cache; // Holds the last freed value, for fast reallocation.
+
+ TreeNode *alloc_node()
+ {
+ if (alloc_cache == nullptr) {
+ return new TreeNode;
+ }
+ alloc_cache->children = 0;
+ return alloc_cache.release();
+ }
+
+ void insert(double val)
+ {
+ if (root == nullptr) {
+ root.reset(alloc_node());
+ root->val = val;
+ return;
+ } else {
+ insert(root.get(), val);
+ }
+ }
+
+ void insert(TreeNode *node, double val)
+ {
+ ++node->children;
+ if (val <= node->val) {
+ // Goes into left.
+ if (node->left == nullptr) {
+ node->left.reset(alloc_node());
+ node->left->val = val;
+ } else {
+ insert(node->left.get(), val);
+ }
+ } else {
+ // Goes into right.
+ if (node->right == nullptr) {
+ node->right.reset(alloc_node());
+ node->right->val = val;
+ } else {
+ insert(node->right.get(), val);
+ }
+ }
+ }
+
+ void remove(double val)
+ {
+ assert(root != nullptr);
+ if (root->children == 0) {
+ assert(root->val == val);
+ alloc_cache = move(root);
+ } else {
+ remove(root.get(), val);
+ }
+ }
+
+ void remove(TreeNode *node, double val)
+ {
+ //printf("Down into %p looking for %f [left=%p right=%p]\n", node, val, node->left.get(), node->right.get());
+ if (node->val == val) {
+ remove(node);
+ } else if (val < node->val) {
+ assert(node->left != nullptr);
+ --node->children;
+ if (node->left->children == 0) {
+ assert(node->left->val == val);
+ alloc_cache = move(node->left);
+ } else {
+ remove(node->left.get(), val);
+ }
+ } else {
+ assert(node->right != nullptr);
+ --node->children;
+ if (node->right->children == 0) {
+ assert(node->right->val == val);
+ alloc_cache = move(node->right);
+ } else {
+ remove(node->right.get(), val);
+ }
+ }
+ }
+
+ // Declares a node to be empty, so it should pull up the value of one of its children.
+ // The node must be an interior node (ie., have at least one child).
+ void remove(TreeNode *node)
+ {
+ //printf("Decided that %p must be removed\n", node);
+ assert(node->children > 0);
+ --node->children;
+
+ bool remove_left;
+ if (node->right == nullptr) {
+ remove_left = true;
+ } else if (node->left == nullptr) {
+ remove_left = false;
+ } else {
+ remove_left = (node->left->children >= node->right->children);
+ }
+ if (remove_left) {
+ if (node->left->children == 0) {
+ node->val = node->left->val;
+ alloc_cache = move(node->left);
+ } else {
+ // Move maximum value up to this node.
+ node->val = elem_at(node->left.get(), node->left->children);
+ remove(node->left.get(), node->val);
+ }
+ } else {
+ if (node->right->children == 0) {
+ node->val = node->right->val;
+ alloc_cache = move(node->right);
+ } else {
+ // Move minimum value up to this node.
+ node->val = elem_at(node->right.get(), 0);
+ remove(node->right.get(), node->val);
+ }
+ }
+ }
+
+ double elem_at(size_t elem_idx)
+ {
+ return elem_at(root.get(), elem_idx);
+ }
+
+ double elem_at(TreeNode *node, size_t elem_idx)
+ {
+ //printf("Looking for %lu in node %p [%lu children]\n", elem_idx, node, node->children);
+ assert(node != nullptr);
+ assert(elem_idx <= node->children);
+ if (node->left != nullptr) {
+ if (elem_idx <= node->left->children) {
+ return elem_at(node->left.get(), elem_idx);
+ } else {
+ elem_idx -= node->left->children + 1;
+ }
+ }
+ if (elem_idx == 0) {
+ return node->val;
+ }
+ return elem_at(node->right.get(), elem_idx - 1);
+ }
+
+ void print_tree(TreeNode *node, size_t indent, double min, double max)
+ {
+ if (node == nullptr) return;
+ if (!(node->val >= min && node->val <= max)) {
+ //printf("node %p is outside range [%f,%f]\n", node, min, max);
+ assert(false);
+ }
+ for (size_t i = 0; i < indent * 2; ++i) putchar(' ');
+ printf("%f [%p, %lu children]\n", node->val, node, node->children);
+ print_tree(node->left.get(), indent + 1, min, node->val);
+ print_tree(node->right.get(), indent + 1, node->val, max);
+ }
+
+public:
+ HistoryJitter(size_t history_length, double multiplier, double percentile)
+ : history_length(history_length), multiplier(multiplier), percentile(percentile) {}
+
+ void update(double timestamp, double frame_duration, size_t dropped_frames)
+ {
+ //if (++num_updates % 1000 == 0) {
+ // printf("%d... [%lu in tree %p]\n", num_updates, root->children + 1, root.get());
+ //}
+
+ if (expected_timestamp >= 0.0) {
+ expected_timestamp += dropped_frames * frame_duration;
+ double jitter_seconds = fabs(expected_timestamp - timestamp);
+
+ history.push_back(jitter_seconds);
+ insert(jitter_seconds);
+ //printf("\nTree %p after insert of %f:\n", root.get(), jitter_seconds);
+ //print_tree(root.get(), 0, -HUGE_VAL, HUGE_VAL);
+ while (history.size() > history_length) {
+ // printf("removing %f, because %p has %lu elements and history has %lu elements\n", history.front(), root.get(), root->children + 1, history.size());
+ remove(history.front());
+ history.pop_front();
+ }
+
+ size_t elem_idx = lrint(percentile * (history.size() - 1));
+// printf("Searching for element %lu in %p, which has %lu elements (history has %lu elements)\n", elem_idx, root.get(), root->children + 1, history.size());
+// fflush(stdout);
+//
+ // Cap at 100 ms.
+ max_jitter_seconds = min(elem_at(elem_idx), 0.1);
+ }
+ expected_timestamp = timestamp + frame_duration;
+ }
+
+ double get_expected() const
+ {
+ return expected_timestamp;
+ }
+
+ double get_jitter() const
+ {
+ return max_jitter_seconds * multiplier;
+ }
+};
+
+void test_jitter_history(const vector<Event> &events, size_t history_length, double multiplier, double percentile, double margin)
+{
+ Queue q;
+ HistoryJitter input_jitter(history_length, multiplier, percentile);
+ HistoryJitter output_jitter(history_length, multiplier, percentile);
+
+ for (const Event &event : events) {
+ if (event.direction == Event::IN) {
+ input_jitter.update(event.t, 0.020, 0);
+ q.add_frame(event.t);
+ } else {
+ double now = event.t;
+ output_jitter.update(event.t, 0.020, 0);
+ q.get_frame(event.t);
+
+ double seconds_until_next_frame = max(input_jitter.get_expected() - now + input_jitter.get_jitter(), 0.0);
+ double master_frame_length_seconds = 0.020;
+
+ seconds_until_next_frame += margin; // Hack.
+
+ size_t safe_queue_length = max<int>(floor((seconds_until_next_frame + output_jitter.get_jitter()) / master_frame_length_seconds), 0);
+ while (q.queue_len() > safe_queue_length) {
+ q.drop_frame();
+ }
+ }
+ if (q.should_abort()) return;
+ }
+ char name[256];
+ snprintf(name, sizeof(name), "history[len=%lu,mul=%.1f,pct=%.4f,margin=%.1f]", history_length, multiplier, percentile, 1e3 * margin);
+ q.eval(name);
+}
+
+int main(int argc, char **argv)
+{
+ static const option long_options[] = {
+ { "max-drops", required_argument, 0, 'd' },
+ { "max-underruns", required_argument, 0, 'u' },
+ { "max-latency-ms", required_argument, 0, 'l' },
+ { 0, 0, 0, 0 }
+ };
+ for ( ;; ) {
+ int option_index = 0;
+ int c = getopt_long(argc, argv, "d:u:l:", long_options, &option_index);
+
+ if (c == -1) {
+ break;
+ }
+ switch (c) {
+ case 'd':
+ max_drops = atof(optarg);
+ break;
+ case 'u':
+ max_underruns = atof(optarg);
+ break;
+ case 'l':
+ max_latency_ms = atof(optarg);
+ break;
+ default:
+ fprintf(stderr, "Usage: simul [--max-drops NUM] [--max-underruns NUM] [--max-latency-ms TIME]\n");
+ exit(1);
+ }
+ }
+
+ vector<Event> events;
+
+ const char *filename = (optind < argc) ? argv[optind] : "nageru-latency-log.txt";
+ FILE *fp = fopen(filename, "r");
+ if (fp == nullptr) {
+ perror(filename);
+ exit(1);
+ }
+ while (!feof(fp)) {
+ char dir[256];
+ double t;
+
+ if (fscanf(fp, "%s %lf", dir, &t) != 2) {
+ break;
+ }
+ if (dir[0] == 'I') {
+ events.push_back(Event{Event::IN, t});
+ } else if (dir[0] == 'O') {
+ events.push_back(Event{Event::OUT, t});
+ } else {
+ fprintf(stderr, "ERROR: Unreadable line\n");
+ exit(1);
+ }
+ }
+ fclose(fp);
+
+ sort(events.begin(), events.end(), [](const Event &a, const Event &b) { return a.t < b.t; });
+
+ test_nodrop(events);
+ test_limit_to_1(events);
+ test_limit_to_2(events);
+ test_nageru_1_2_0(events);
+ for (double multiplier : { 0.0, 0.5, 1.0, 2.0, 3.0, 5.0 }) {
+ for (double alpha : { 0.5, 0.9, 0.99, 0.995, 0.999, 0.9999 }) {
+ for (double margin_ms : { -1.0, 0.0, 1.0, 2.0, 5.0, 10.0, 20.0 }) {
+ test_jitter_filter(events, multiplier, alpha, 1e-3 * margin_ms);
+ }
+ }
+ }
+ for (size_t history_samples : { 10, 100, 500, 1000, 5000, 10000, 25000 }) {
+ for (double multiplier : { 0.5, 1.0, 2.0, 3.0, 5.0, 10.0 }) {
+ for (double percentile : { 0.5, 0.75, 0.9, 0.99, 0.995, 0.999, 1.0 }) {
+ if (lrint(percentile * (history_samples - 1)) == int(history_samples - 1) && percentile != 1.0) {
+ // Redundant.
+ continue;
+ }
+
+ //for (double margin_ms : { -1.0, 0.0, 1.0, 2.0, 5.0, 10.0, 20.0 }) {
+ for (double margin_ms : { 0.0 }) {
+ test_jitter_history(events, history_samples, multiplier, percentile, 1e-3 * margin_ms);
+ }
+ }
+ }
+ }
+}
} // namespace
-void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+void JitterHistory::register_metrics(const vector<pair<string, string>> &labels)
{
- global_metrics.add("input_queue_length_frames", labels, &metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
- global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
- global_metrics.add("input_queue_duped_frames", labels, &metric_input_duped_frames);
+ global_metrics.add("input_underestimated_jitter_frames", labels, &metric_input_underestimated_jitter_frames);
+ global_metrics.add("input_estimated_max_jitter_seconds", labels, &metric_input_estimated_max_jitter_seconds, Metrics::TYPE_GAUGE);
+}
+
+void JitterHistory::unregister_metrics(const vector<pair<string, string>> &labels)
+{
+ global_metrics.remove("input_underestimated_jitter_frames", labels);
+ global_metrics.remove("input_estimated_max_jitter_seconds", labels);
}
-void QueueLengthPolicy::update_policy(unsigned queue_length)
+void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames)
{
- if (queue_length == 0) { // Starvation.
- if (been_at_safe_point_since_last_starvation && safe_queue_length < unsigned(global_flags.max_input_queue_frames)) {
- ++safe_queue_length;
- fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n",
- card_index, safe_queue_length);
+ if (expected_timestamp > steady_clock::time_point::min()) {
+ expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+ double jitter_seconds = fabs(duration<double>(expected_timestamp - now).count());
+ history.push_back(orders.insert(jitter_seconds));
+ if (jitter_seconds > estimate_max_jitter()) {
+ ++metric_input_underestimated_jitter_frames;
}
- frames_with_at_least_one = 0;
- been_at_safe_point_since_last_starvation = false;
- ++metric_input_duped_frames;
- metric_input_queue_safe_length_frames = safe_queue_length;
- metric_input_queue_length_frames = 0;
- return;
+
+ metric_input_estimated_max_jitter_seconds = estimate_max_jitter();
+
+ if (history.size() > history_length) {
+ orders.erase(history.front());
+ history.pop_front();
+ }
+ assert(history.size() <= history_length);
+ }
+ expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+}
+
+double JitterHistory::estimate_max_jitter() const
+{
+ if (orders.empty()) {
+ return 0.0;
}
- if (queue_length >= safe_queue_length) {
- been_at_safe_point_since_last_starvation = true;
+ size_t elem_idx = lrint((orders.size() - 1) * percentile);
+ if (percentile <= 0.5) {
+ return *next(orders.begin(), elem_idx) * multiplier;
+ } else {
+ return *prev(orders.end(), elem_idx + 1) * multiplier;
}
- if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) {
- --safe_queue_length;
- metric_input_queue_safe_length_frames = safe_queue_length;
- fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n",
- card_index, safe_queue_length);
- frames_with_at_least_one = 0;
+}
+
+void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+{
+ global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
+}
+
+void QueueLengthPolicy::unregister_metrics(const vector<pair<string, string>> &labels)
+{
+ global_metrics.remove("input_queue_safe_length_frames", labels);
+}
+
+void QueueLengthPolicy::update_policy(steady_clock::time_point now,
+ steady_clock::time_point expected_next_frame,
+ int64_t master_frame_duration,
+ double max_input_card_jitter_seconds,
+ double max_master_card_jitter_seconds)
+{
+ double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE);
+
+ // Figure out when we can expect the next frame for this card, assuming
+ // worst-case jitter (ie., the frame is maximally late).
+ double seconds_until_next_frame = max(duration<double>(expected_next_frame - now).count() + max_input_card_jitter_seconds, 0.0);
+
+ // How many times are the master card expected to tick in that time?
+ // We assume the master clock has worst-case jitter but not any rate
+ // discrepancy, ie., it ticks as early as possible every time, but not
+ // cumulatively.
+ double frames_needed = (seconds_until_next_frame + max_master_card_jitter_seconds) / master_frame_duration_seconds;
+
+ // As a special case, if the master card ticks faster than the input card,
+ // we expect the queue to drain by itself even without dropping. But if
+ // the difference is small (e.g. 60 Hz master and 59.94 input), it would
+ // go slowly enough that the effect wouldn't really be appreciable.
+ // We account for this by looking at the situation five frames ahead,
+ // assuming everything else is the same.
+ double frames_allowed;
+ if (max_master_card_jitter_seconds < max_input_card_jitter_seconds) {
+ frames_allowed = frames_needed + 5 * (max_input_card_jitter_seconds - max_master_card_jitter_seconds) / master_frame_duration_seconds;
+ } else {
+ frames_allowed = frames_needed;
}
- metric_input_queue_length_frames = min(queue_length, safe_queue_length); // The caller will drop frames for us if needed.
+
+ safe_queue_length = max<int>(floor(frames_allowed), 0);
+ metric_input_queue_safe_length_frames = safe_queue_length;
}
Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
metric_start_time_seconds = get_timestamp_for_metrics();
+ output_jitter_history.register_metrics({{ "card", "output" }});
global_metrics.add("frames_output_total", &metric_frames_output_total);
global_metrics.add("frames_output_dropped", &metric_frames_output_dropped);
global_metrics.add("start_time_seconds", &metric_start_time_seconds, Metrics::TYPE_GAUGE);
// Unregister old metrics, if any.
if (!card->labels.empty()) {
const vector<pair<string, string>> &labels = card->labels;
+ card->jitter_history.unregister_metrics(labels);
+ card->queue_length_policy.unregister_metrics(labels);
global_metrics.remove("input_received_frames", labels);
global_metrics.remove("input_dropped_frames_jitter", labels);
global_metrics.remove("input_dropped_frames_error", labels);
global_metrics.remove("input_dropped_frames_resets", labels);
+ global_metrics.remove("input_queue_length_frames", labels);
+ global_metrics.remove("input_queue_duped_frames", labels);
global_metrics.remove("input_has_signal_bool", labels);
global_metrics.remove("input_is_connected_bool", labels);
default:
assert(false);
}
+ card->jitter_history.register_metrics(labels);
card->queue_length_policy.register_metrics(labels);
global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames);
global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter);
global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error);
global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets);
+ global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
+ global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames);
global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE);
global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE);
lock.unlock();
fake_capture->stop_dequeue_thread();
lock.lock();
- old_card->capture = move(old_card->parked_capture);
+ old_card->capture = move(old_card->parked_capture); // TODO: reset the metrics
old_card->is_fake_capture = false;
old_card->capture->start_bm_capture();
}
card->output->start_output(desired_output_video_mode, pts_int);
}
output_card_index = card_index;
+ output_jitter_history.clear();
}
namespace {
card->last_timecode = timecode;
+ // Calculate jitter for this card here. We do it on arrival so that we
+ // make sure every frame counts, even the dropped ones -- and it will also
+ // make sure the jitter number is as recent as possible, should it change.
+ card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames);
+
PBOFrameAllocator::Userdata *userdata = (PBOFrameAllocator::Userdata *)video_frame.userdata;
size_t cbcr_width, cbcr_height, cbcr_offset, y_offset;
return (card_index == master_card_index);
}
-void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
+void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length)
{
// Count the number of frames in the queue, including any frames
// we dropped. It's hard to know exactly how we should deal with
for (const CaptureCard::NewFrame &frame : card->new_frames) {
queue_length += frame.dropped_frames + 1;
}
- card->queue_length_policy.update_policy(queue_length);
// If needed, drop frames until the queue is below the safe limit.
// We prefer to drop from the head, because all else being equal,
// we'd like more recent frames (less latency).
unsigned dropped_frames = 0;
- while (queue_length > card->queue_length_policy.get_safe_queue_length()) {
+ while (queue_length > safe_queue_length) {
assert(!card->new_frames.empty());
assert(queue_length > card->new_frames.front().dropped_frames);
queue_length -= card->new_frames.front().dropped_frames;
- if (queue_length <= card->queue_length_policy.get_safe_queue_length()) {
+ if (queue_length <= safe_queue_length) {
// No need to drop anything.
break;
}
}
card->metric_input_dropped_frames_jitter += dropped_frames;
+ card->metric_input_queue_length_frames = queue_length;
#if 0
if (dropped_frames > 0) {
goto start;
}
- if (!master_card_is_output) {
- output_frame_info.frame_timestamp =
- cards[master_card_index].new_frames.front().received_timestamp;
- }
-
for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
CaptureCard *card = &cards[card_index];
- if (input_card_is_master_clock(card_index, master_card_index)) {
- // We don't use the queue length policy for the master card,
- // but we will if it stops being the master. Thus, clear out
- // the policy in case we switch in the future.
- card->queue_length_policy.reset(card_index);
- assert(!card->new_frames.empty());
+ if (card->new_frames.empty()) { // Starvation.
+ ++card->metric_input_duped_frames;
} else {
- trim_queue(card, card_index);
- }
- if (!card->new_frames.empty()) {
new_frames[card_index] = move(card->new_frames.front());
has_new_frame[card_index] = true;
card->new_frames.pop_front();
}
if (!master_card_is_output) {
+ output_frame_info.frame_timestamp = new_frames[master_card_index].received_timestamp;
output_frame_info.dropped_frames = new_frames[master_card_index].dropped_frames;
output_frame_info.frame_duration = new_frames[master_card_index].length;
}
+ if (!output_frame_info.is_preroll) {
+ output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames);
+ }
+
+ for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
+ CaptureCard *card = &cards[card_index];
+ if (has_new_frame[card_index] &&
+ !input_card_is_master_clock(card_index, master_card_index) &&
+ !output_frame_info.is_preroll) {
+ card->queue_length_policy.update_policy(
+ output_frame_info.frame_timestamp,
+ card->jitter_history.get_expected_next_frame(),
+ output_frame_info.frame_duration,
+ card->jitter_history.estimate_max_jitter(),
+ output_jitter_history.estimate_max_jitter());
+ trim_queue(card, min<int>(global_flags.max_input_queue_frames,
+ card->queue_length_policy.get_safe_queue_length()));
+ }
+ }
+
// This might get off by a fractional sample when changing master card
// between ones with different frame rates, but that's fine.
int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples;
class YCbCrInput;
} // namespace movit
+// A class to estimate the future jitter. Used in QueueLengthPolicy (see below).
+//
+// There are many ways to estimate jitter; I've tested a few ones (and also
+// some algorithms that don't explicitly model jitter) with different
+// parameters on some real-life data in experiments/queue_drop_policy.cpp.
+// This is one based on simple order statistics where I've added some margin in
+// the number of starvation events; I believe that about one every hour would
+// probably be acceptable, but this one typically goes lower than that, at the
+// cost of 2–3 ms extra latency. (If the queue is hard-limited to one frame, it's
+// possible to get ~10 ms further down, but this would mean framedrops every
+// second or so.) The general strategy is: Take the 99.9-percentile jitter over
+// last 5000 frames, multiply by two, and that's our worst-case jitter
+// estimate. The fact that we're not using the max value means that we could
+// actually even throw away very late frames immediately, which means we only
+// get one user-visible event instead of seeing something both when the frame
+// arrives late (duplicate frame) and then again when we drop.
+class JitterHistory {
+private:
+ static constexpr size_t history_length = 5000;
+ static constexpr double percentile = 0.999;
+ static constexpr double multiplier = 2.0;
+
+public:
+ void register_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
+ void unregister_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
+
+ void clear() {
+ history.clear();
+ orders.clear();
+ }
+ void frame_arrived(std::chrono::steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames);
+ std::chrono::steady_clock::time_point get_expected_next_frame() const { return expected_timestamp; }
+ double estimate_max_jitter() const;
+
+private:
+ // A simple O(k) based algorithm for getting the k-th largest or
+ // smallest element from our window; we simply keep the multiset
+ // ordered (insertions and deletions are O(n) as always) and then
+ // iterate from one of the sides. If we had larger values of k,
+ // we could go for a more complicated setup with two sets or heaps
+ // (one increasing and one decreasing) that we keep balanced around
+ // the point, or it is possible to reimplement std::set with
+ // counts in each node. However, since k=5, we don't need this.
+ std::multiset<double> orders;
+ std::deque<std::multiset<double>::iterator> history;
+
+ std::chrono::steady_clock::time_point expected_timestamp = std::chrono::steady_clock::time_point::min();
+
+ // Metrics. There are no direct summaries for jitter, since we already have latency summaries.
+ std::atomic<int64_t> metric_input_underestimated_jitter_frames{0};
+ std::atomic<double> metric_input_estimated_max_jitter_seconds{0.0 / 0.0};
+};
+
// For any card that's not the master (where we pick out the frames as they
// come, as fast as we can process), there's going to be a queue. The question
// is when we should drop frames from that queue (apart from the obvious
// 2. We don't want to add more delay than is needed.
//
// Our general strategy is to drop as many frames as we can (helping for #2)
-// that we think is safe for #1 given jitter. To this end, we set a lower floor N,
-// where we assume that if we have N frames in the queue, we're always safe from
-// starvation. (Typically, N will be 0 or 1. It starts off at 0.) If we have
-// more than N frames in the queue after reading out the one we need, we head-drop
-// them to reduce the queue.
+// that we think is safe for #1 given jitter. To this end, we measure the
+// deviation from the expected arrival time for all cards, and use that for
+// continuous jitter estimation.
//
-// N is reduced as follows: If the queue has had at least one spare frame for
-// at least 50 (master) frames (ie., it's been too conservative for a second),
-// we reduce N by 1 and reset the timers.
-//
-// Whenever the queue is starved (we needed a frame but there was none),
-// and we've been at N since the last starvation, N was obviously too low,
-// so we increment it. We will never set N above 5, though.
+// We then drop everything from the queue that we're sure we won't need to
+// serve the output in the time before the next frame arrives. Typically,
+// this means the queue will contain 0 or 1 frames, although more is also
+// possible if the jitter is very high.
class QueueLengthPolicy {
public:
QueueLengthPolicy() {}
void reset(unsigned card_index) {
this->card_index = card_index;
- safe_queue_length = 1;
- frames_with_at_least_one = 0;
- been_at_safe_point_since_last_starvation = false;
}
void register_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
-
- void update_policy(unsigned queue_length); // Call before picking out a frame, so 0 means starvation.
+ void unregister_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
+
+ // Call after picking out a frame, so 0 means starvation.
+ void update_policy(std::chrono::steady_clock::time_point now,
+ std::chrono::steady_clock::time_point expected_next_frame,
+ int64_t master_frame_duration,
+ double max_input_card_jitter_seconds,
+ double max_master_card_jitter_seconds);
unsigned get_safe_queue_length() const { return safe_queue_length; }
private:
- unsigned card_index; // For debugging only.
- unsigned safe_queue_length = 1; // Called N in the comments. Can never go below 1.
- unsigned frames_with_at_least_one = 0;
- bool been_at_safe_point_since_last_starvation = false;
+ unsigned card_index; // For debugging and metrics only.
+ unsigned safe_queue_length = 0; // Can never go below zero.
// Metrics.
- std::atomic<int64_t> metric_input_queue_length_frames{0};
std::atomic<int64_t> metric_input_queue_safe_length_frames{1};
- std::atomic<int64_t> metric_input_duped_frames{0};
};
class Mixer {
void audio_thread_func();
void release_display_frame(DisplayFrame *frame);
double pts() { return double(pts_int) / TIMEBASE; }
- // Call this _before_ trying to pull out a frame from a capture card;
- // it will update the policy and drop the right amount of frames for you.
- void trim_queue(CaptureCard *card, unsigned card_index);
+ void trim_queue(CaptureCard *card, size_t safe_queue_length);
HTTPD httpd;
unsigned num_cards, num_video_inputs;
int last_timecode = -1; // Unwrapped.
+ JitterHistory jitter_history;
+
// Metrics.
std::vector<std::pair<std::string, std::string>> labels;
std::atomic<int64_t> metric_input_received_frames{0};
+ std::atomic<int64_t> metric_input_duped_frames{0};
std::atomic<int64_t> metric_input_dropped_frames_jitter{0};
std::atomic<int64_t> metric_input_dropped_frames_error{0};
std::atomic<int64_t> metric_input_resets{0};
+ std::atomic<int64_t> metric_input_queue_length_frames{0};
std::atomic<int64_t> metric_input_has_signal_bool{-1};
std::atomic<int64_t> metric_input_is_connected_bool{-1};
std::atomic<int64_t> metric_input_frame_rate_den{-1};
std::atomic<int64_t> metric_input_sample_rate_hz{-1};
};
+ JitterHistory output_jitter_history;
CaptureCard cards[MAX_VIDEO_CARDS]; // Protected by <card_mutex>.
YCbCrInterpretation ycbcr_interpretation[MAX_VIDEO_CARDS]; // Protected by <card_mutex>.
AudioMixer audio_mixer; // Same as global_audio_mixer (see audio_mixer.h).