]> git.sesse.net Git - nageru/commitdiff
Rework the queue drop algorithm again.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 25 Jun 2017 14:41:47 +0000 (16:41 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 25 Jun 2017 15:42:49 +0000 (17:42 +0200)
The old one worked fine for most cases, but when two cards drift
very slowly out of sync (and one of them is the master), it's inadequate;
we should simply drop one frame at the right moment and be done with it.
Of course, knowing what the right moment is can be tricky, due to jitter,
which we now explicitly model.

Also include a separate program that tests various properties of different
possible policies (used to determine which one to use), based on replaying
real event logs.

experiments/queue_drop_policy.cpp [new file with mode: 0644]

diff --git a/experiments/queue_drop_policy.cpp b/experiments/queue_drop_policy.cpp
new file mode 100644 (file)
index 0000000..dfd5f3c
--- /dev/null
@@ -0,0 +1,588 @@
+ * A program to simulate various queue-drop strategies, using real frame
+ * arrival data as input. Contains various anchors, as well as parametrized
+ * values of the real algorithms that have been used in Nageru over time.
+ *
+ * Expects a log of frame arrivals (in and out). This isn't included in the
+ * git repository because it's quite large, but there's one available 
+ * in compressed form at
+ *
+ *   https://storage.sesse.net/nageru-latency-log.txt.xz
+ *
+ * The data set in question contains a rather difficult case, with two 50 Hz
+ * clocks slowly drifting from each other (at the rate of about a frame an hour).
+ * This means they are very nearly in sync for a long time, where rare bursts
+ * of jitter can make it hard for the algorithm to find the right level of
+ * conservatism.
+ *
+ * This is not meant to be production-quality code.
+ */
+#include <assert.h>
+#include <getopt.h>
+#include <math.h>
+#include <stdio.h>
+#include <locale.h>
+#include <string.h>
+#include <stdlib.h>
+#include <algorithm>
+#include <vector>
+#include <deque>
+#include <memory>
+#include <string>
+#include <limits>
+using namespace std;
+size_t max_drops = numeric_limits<size_t>::max();
+size_t max_underruns = numeric_limits<size_t>::max();
+double max_latency_ms = numeric_limits<double>::max();
+struct Event {
+       enum { IN, OUT } direction;
+       double t;
+class Queue {
+       void add_frame(double t);
+       void get_frame(double now);
+       void drop_frame();
+       void eval(const string &name);
+       size_t queue_len() const { return frames_in_queue.size(); }
+       bool should_abort() const { return num_underruns > max_underruns || num_drops > max_drops; }
+       deque<double> frames_in_queue;
+       size_t num_underruns = 0;
+       size_t num_drops = 0;
+       size_t frames_since_underrun = 0;
+       size_t num_drops_on_first = 0;
+       double latency_sum = 0.0;
+       size_t latency_count = 0;
+void Queue::add_frame(double t)
+       frames_in_queue.push_back(t);
+void Queue::get_frame(double now)
+       if (frames_in_queue.empty()) {
+               ++num_underruns;
+               frames_since_underrun = 0;
+               return;
+       }
+       double t = frames_in_queue.front();
+       frames_in_queue.pop_front();
+       assert(now >= t);
+       latency_sum += (now - t);
+       ++latency_count;
+       ++frames_since_underrun;
+void Queue::drop_frame()
+       assert(!frames_in_queue.empty());
+       frames_in_queue.pop_front();
+       ++num_drops;
+       if (frames_since_underrun <= 1) {
+               ++num_drops_on_first;
+       }
+void Queue::eval(const string &name)
+       double latency_ms = 1e3 * latency_sum / latency_count;
+       if (num_underruns > max_underruns) return;
+       if (num_drops > max_drops) return;
+       if (latency_ms > max_latency_ms) return;
+       printf("%-50s: %2lu frames left in queue at end, %5lu underruns, %5lu drops (%5lu immediate), %6.2f ms avg latency\n",
+               name.c_str(), frames_in_queue.size(), num_underruns, num_drops, num_drops_on_first, latency_ms);
+// A strategy that never drops; low anchor for drops and underruns, high anchor for latency.
+void test_nodrop(const vector<Event> &events)
+       Queue q;
+       for (const Event &event : events) {
+               if (event.direction == Event::IN) {
+                       q.add_frame(event.t);
+               } else {
+                       q.get_frame(event.t);
+               }
+       }
+       q.eval("no-drop");
+// A strategy that accepts only one element in the queue; low anchor for latency.
+void test_limit_to_1(const vector<Event> &events)
+       Queue q;
+       for (const Event &event : events) {
+               if (event.direction == Event::IN) {
+                       q.add_frame(event.t);
+                       while (q.queue_len() > 1) q.drop_frame();
+               } else {
+                       q.get_frame(event.t);
+               }
+       }
+       q.eval("limit-to-1");
+// A strategy that accepts one or two elements in the queue.
+void test_limit_to_2(const vector<Event> &events)
+       Queue q;
+       for (const Event &event : events) {
+               if (event.direction == Event::IN) {
+                       q.add_frame(event.t);
+                       while (q.queue_len() > 2) q.drop_frame();
+               } else {
+                       q.get_frame(event.t);
+               }
+       }
+       q.eval("limit-to-2");
+// The algorithm used from Nageru 1.2.0 to 1.6.0; raise the ceiling by 1 every time
+// we underrun, drop it if the ceiling hasn't been needed for 1000 frames.
+void test_nageru_1_2_0(const vector<Event> &events)
+       Queue q;
+       unsigned safe_queue_length = 1;
+       unsigned frames_with_at_least_one = 0;
+       bool been_at_safe_point_since_last_starvation = false;
+       for (const Event &event : events) {
+               if (event.direction == Event::IN) {
+                       q.add_frame(event.t);
+               } else {
+                       unsigned queue_length = q.queue_len();
+                       if (queue_length == 0) {  // Starvation.
+                               if (been_at_safe_point_since_last_starvation /*&& safe_queue_length < unsigned(global_flags.max_input_queue_frames)*/) {
+                                       ++safe_queue_length;
+                               }
+                               frames_with_at_least_one = 0;
+                               been_at_safe_point_since_last_starvation = false;
+                               q.get_frame(event.t);  // mark it
+                               continue;
+                       }
+                       if (queue_length >= safe_queue_length) {
+                               been_at_safe_point_since_last_starvation = true;
+                       }
+                       if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) {
+                               --safe_queue_length;
+                               frames_with_at_least_one = 0;
+                       }
+                       while (q.queue_len() > safe_queue_length) {
+                               q.drop_frame();
+                       }
+                       q.get_frame(event.t);
+               }
+       }
+       q.eval("nageru-1.2.0");
+class Jitter {
+       const double multiplier, alpha;
+       double expected_timestamp = -1.0;
+       double max_jitter_seconds = 0.0;
+       Jitter(double multiplier, double alpha)
+               : multiplier(multiplier), alpha(alpha) {}
+       void update(double timestamp, double frame_duration, size_t dropped_frames)
+       {
+               if (expected_timestamp >= 0.0) {
+                       expected_timestamp += dropped_frames * frame_duration;
+                       double jitter_seconds = fabs(expected_timestamp - timestamp);
+                       max_jitter_seconds = max(multiplier * jitter_seconds, alpha * max_jitter_seconds);  // About two seconds half-time.
+                       // Cap at 100 ms.
+                       max_jitter_seconds = min(max_jitter_seconds, 0.1);
+               }
+               expected_timestamp = timestamp + frame_duration;
+       }
+       double get_expected() const
+       {
+               return expected_timestamp;
+       }
+       double get_jitter() const
+       {
+               return max_jitter_seconds;
+       }
+// Keep a running estimate of k times max jitter seen, decreasing by a factor alpha every frame.
+void test_jitter_filter(const vector<Event> &events, double multiplier, double alpha, double margin)
+       Queue q;
+       Jitter input_jitter(multiplier, alpha);
+       Jitter output_jitter(multiplier, alpha);
+       for (const Event &event : events) {
+               if (event.direction == Event::IN) {
+                       input_jitter.update(event.t, 0.020, 0);
+                       q.add_frame(event.t);
+               } else {
+                       double now = event.t;
+                       output_jitter.update(event.t, 0.020, 0);
+                       q.get_frame(event.t);
+                       double seconds_until_next_frame = max(input_jitter.get_expected() - now + input_jitter.get_jitter(), 0.0);
+                       double master_frame_length_seconds = 0.020;
+                       seconds_until_next_frame += margin;  // Hack.
+                       size_t safe_queue_length = max<int>(floor((seconds_until_next_frame + output_jitter.get_jitter()) / master_frame_length_seconds), 0);
+                       while (q.queue_len() > safe_queue_length) {
+                               q.drop_frame();
+                       }
+               }
+               if (q.should_abort()) return;
+       }
+       char name[256];
+       snprintf(name, sizeof(name), "jitter-filter[mul=%.1f,alpha=%.4f,margin=%.1f]", multiplier, alpha, 1e3 * margin);
+       q.eval(name);
+// Implements an unbalanced binary search tree that can also satisfy order queries
+// (e.g. “give me the 86th largest entry”).
+class HistoryJitter {
+       const size_t history_length;
+       const double multiplier, percentile;
+       double expected_timestamp = 0.0;
+       double max_jitter_seconds = 0.0;
+       size_t num_updates = 0;
+       deque<double> history;
+       struct TreeNode {
+               double val;
+               size_t children = 0;
+               unique_ptr<TreeNode> left, right;
+       };
+       unique_ptr<TreeNode> root;
+       unique_ptr<TreeNode> alloc_cache;  // Holds the last freed value, for fast reallocation.
+       TreeNode *alloc_node()
+       {
+               if (alloc_cache == nullptr) {
+                       return new TreeNode;
+               }
+               alloc_cache->children = 0;
+               return alloc_cache.release();
+       }
+       void insert(double val)
+       {
+               if (root == nullptr) {
+                       root.reset(alloc_node());
+                       root->val = val;
+                       return;
+               } else {
+                       insert(root.get(), val);
+               }
+       }
+       void insert(TreeNode *node, double val)
+       {
+               ++node->children;
+               if (val <= node->val) {
+                       // Goes into left.
+                       if (node->left == nullptr) {
+                               node->left.reset(alloc_node());
+                               node->left->val = val;
+                       } else {
+                               insert(node->left.get(), val);
+                       }
+               } else {
+                       // Goes into right.
+                       if (node->right == nullptr) {
+                               node->right.reset(alloc_node());
+                               node->right->val = val;
+                       } else {
+                               insert(node->right.get(), val);
+                       }
+               }
+       }
+       void remove(double val)
+       {
+               assert(root != nullptr);
+               if (root->children == 0) {
+                       assert(root->val == val);
+                       alloc_cache = move(root);
+               } else {
+                       remove(root.get(), val);
+               }
+       }
+       void remove(TreeNode *node, double val)
+       {
+               //printf("Down into %p looking for %f [left=%p right=%p]\n", node, val, node->left.get(), node->right.get());
+               if (node->val == val) {
+                       remove(node);
+               } else if (val < node->val) {
+                       assert(node->left != nullptr);
+                       --node->children;
+                       if (node->left->children == 0) {
+                               assert(node->left->val == val);
+                               alloc_cache = move(node->left);
+                       } else {
+                               remove(node->left.get(), val);
+                       }
+               } else {
+                       assert(node->right != nullptr);
+                       --node->children;
+                       if (node->right->children == 0) {
+                               assert(node->right->val == val);
+                               alloc_cache = move(node->right);
+                       } else {
+                               remove(node->right.get(), val);
+                       }
+               }
+       }
+       // Declares a node to be empty, so it should pull up the value of one of its children.
+       // The node must be an interior node (ie., have at least one child).
+       void remove(TreeNode *node)
+       {
+               //printf("Decided that %p must be removed\n", node);
+               assert(node->children > 0);
+               --node->children;
+               bool remove_left;
+               if (node->right == nullptr) {
+                       remove_left = true;
+               } else if (node->left == nullptr) {
+                       remove_left = false;
+               } else {
+                       remove_left = (node->left->children >= node->right->children);
+               }
+               if (remove_left) {
+                       if (node->left->children == 0) {
+                               node->val = node->left->val;
+                               alloc_cache = move(node->left);
+                       } else {
+                               // Move maximum value up to this node.
+                               node->val = elem_at(node->left.get(), node->left->children);
+                               remove(node->left.get(), node->val);
+                       }
+               } else {
+                       if (node->right->children == 0) {
+                               node->val = node->right->val;
+                               alloc_cache = move(node->right);
+                       } else {
+                               // Move minimum value up to this node.
+                               node->val = elem_at(node->right.get(), 0);
+                               remove(node->right.get(), node->val);
+                       }
+               }
+       }
+       double elem_at(size_t elem_idx)
+       {
+               return elem_at(root.get(), elem_idx);
+       }
+       double elem_at(TreeNode *node, size_t elem_idx)
+       {
+               //printf("Looking for %lu in node %p [%lu children]\n", elem_idx, node, node->children);
+               assert(node != nullptr);
+               assert(elem_idx <= node->children);
+               if (node->left != nullptr) {
+                       if (elem_idx <= node->left->children) {
+                               return elem_at(node->left.get(), elem_idx);
+                       } else {
+                               elem_idx -= node->left->children + 1;
+                       }
+               }
+               if (elem_idx == 0) {
+                       return node->val;
+               }
+               return elem_at(node->right.get(), elem_idx - 1);
+       }
+       void print_tree(TreeNode *node, size_t indent, double min, double max)
+       {
+               if (node == nullptr) return;
+               if (!(node->val >= min && node->val <= max)) {
+                       //printf("node %p is outside range [%f,%f]\n", node, min, max);
+                       assert(false);
+               }
+               for (size_t i = 0; i < indent * 2; ++i) putchar(' ');
+               printf("%f [%p, %lu children]\n", node->val, node, node->children);
+               print_tree(node->left.get(), indent + 1, min, node->val);
+               print_tree(node->right.get(), indent + 1, node->val, max);
+       }
+       HistoryJitter(size_t history_length, double multiplier, double percentile)
+               : history_length(history_length), multiplier(multiplier), percentile(percentile) {}
+       void update(double timestamp, double frame_duration, size_t dropped_frames)
+       {
+               //if (++num_updates % 1000 == 0) {
+               //      printf("%d... [%lu in tree %p]\n", num_updates, root->children + 1, root.get());
+               //}
+               if (expected_timestamp >= 0.0) {
+                       expected_timestamp += dropped_frames * frame_duration;
+                       double jitter_seconds = fabs(expected_timestamp - timestamp);
+                       history.push_back(jitter_seconds);
+                       insert(jitter_seconds);
+                       //printf("\nTree %p after insert of %f:\n", root.get(), jitter_seconds);
+                       //print_tree(root.get(), 0, -HUGE_VAL, HUGE_VAL);
+                       while (history.size() > history_length) {
+                       //      printf("removing %f, because %p has %lu elements and history has %lu elements\n", history.front(), root.get(), root->children + 1, history.size());
+                               remove(history.front());
+                               history.pop_front();
+                       }
+                       size_t elem_idx = lrint(percentile * (history.size() - 1));
+//                     printf("Searching for element %lu in %p, which has %lu elements (history has %lu elements)\n", elem_idx, root.get(), root->children + 1, history.size());
+//                     fflush(stdout);
+                       // Cap at 100 ms.
+                       max_jitter_seconds = min(elem_at(elem_idx), 0.1);
+               }
+               expected_timestamp = timestamp + frame_duration;
+       }
+       double get_expected() const
+       {
+               return expected_timestamp;
+       }
+       double get_jitter() const
+       {
+               return max_jitter_seconds * multiplier;
+       }
+void test_jitter_history(const vector<Event> &events, size_t history_length, double multiplier, double percentile, double margin)
+       Queue q;
+       HistoryJitter input_jitter(history_length, multiplier, percentile);
+       HistoryJitter output_jitter(history_length, multiplier, percentile);
+       for (const Event &event : events) {
+               if (event.direction == Event::IN) {
+                       input_jitter.update(event.t, 0.020, 0);
+                       q.add_frame(event.t);
+               } else {
+                       double now = event.t;
+                       output_jitter.update(event.t, 0.020, 0);
+                       q.get_frame(event.t);
+                       double seconds_until_next_frame = max(input_jitter.get_expected() - now + input_jitter.get_jitter(), 0.0);
+                       double master_frame_length_seconds = 0.020;
+                       seconds_until_next_frame += margin;  // Hack.
+                       size_t safe_queue_length = max<int>(floor((seconds_until_next_frame + output_jitter.get_jitter()) / master_frame_length_seconds), 0);
+                       while (q.queue_len() > safe_queue_length) {
+                               q.drop_frame();
+                       }
+               }
+               if (q.should_abort()) return;
+       }
+       char name[256];
+       snprintf(name, sizeof(name), "history[len=%lu,mul=%.1f,pct=%.4f,margin=%.1f]", history_length, multiplier, percentile, 1e3 * margin);
+       q.eval(name);
+int main(int argc, char **argv)
+       static const option long_options[] = {
+               { "max-drops", required_argument, 0, 'd' },
+               { "max-underruns", required_argument, 0, 'u' },
+               { "max-latency-ms", required_argument, 0, 'l' },
+               { 0, 0, 0, 0 }
+       };      
+       for ( ;; ) {
+               int option_index = 0;
+               int c = getopt_long(argc, argv, "d:u:l:", long_options, &option_index);
+               if (c == -1) {
+                       break;
+               }
+                switch (c) {
+                case 'd':
+                       max_drops = atof(optarg);
+                       break;
+                case 'u':
+                       max_underruns = atof(optarg);
+                       break;
+                case 'l':
+                       max_latency_ms = atof(optarg);
+                       break;
+               default:
+                       fprintf(stderr, "Usage: simul [--max-drops NUM] [--max-underruns NUM] [--max-latency-ms TIME]\n");
+                       exit(1);
+               }
+       }
+       vector<Event> events;
+       const char *filename = (optind < argc) ? argv[optind] : "nageru-latency-log.txt";
+       FILE *fp = fopen(filename, "r");
+       if (fp == nullptr) {
+               perror(filename);
+               exit(1);
+       }
+       while (!feof(fp)) {
+               char dir[256];
+               double t;
+               if (fscanf(fp, "%s %lf", dir, &t) != 2) {
+                       break;
+               }
+               if (dir[0] == 'I') {
+                       events.push_back(Event{Event::IN, t});
+               } else if (dir[0] == 'O') {
+                       events.push_back(Event{Event::OUT, t});
+               } else {
+                       fprintf(stderr, "ERROR: Unreadable line\n");
+                       exit(1);
+               }
+       }
+       fclose(fp);
+       sort(events.begin(), events.end(), [](const Event &a, const Event &b) { return a.t < b.t; });
+       test_nodrop(events);
+       test_limit_to_1(events);
+       test_limit_to_2(events);
+       test_nageru_1_2_0(events);
+       for (double multiplier : { 0.0, 0.5, 1.0, 2.0, 3.0, 5.0 }) {
+               for (double alpha : { 0.5, 0.9, 0.99, 0.995, 0.999, 0.9999 }) {
+                       for (double margin_ms : { -1.0, 0.0, 1.0, 2.0, 5.0, 10.0, 20.0 }) {
+                               test_jitter_filter(events, multiplier, alpha, 1e-3 * margin_ms);
+                       }
+               }
+       }
+       for (size_t history_samples : { 10, 100, 500, 1000, 5000, 10000, 25000 }) {
+               for (double multiplier : { 0.5, 1.0, 2.0, 3.0, 5.0, 10.0 }) {
+                       for (double percentile : { 0.5, 0.75, 0.9, 0.99, 0.995, 0.999, 1.0 }) {
+                               if (lrint(percentile * (history_samples - 1)) == int(history_samples - 1) && percentile != 1.0) {
+                                       // Redundant.
+                                       continue;
+                               }
+                               //for (double margin_ms : { -1.0, 0.0, 1.0, 2.0, 5.0, 10.0, 20.0 }) {
+                               for (double margin_ms : { 0.0 }) {
+                                       test_jitter_history(events, history_samples, multiplier, percentile, 1e-3 * margin_ms);
+                               }
+                       }
+               }
+       }
index 9f549e968b7eb173f872d55fc316f7c1982ccc3e..3ce49c18cf9bda311a17604d025607b647c8310c 100644 (file)
--- a/mixer.cpp
+++ b/mixer.cpp
@@ -198,39 +198,95 @@ void upload_texture(GLuint tex, GLuint width, GLuint height, GLuint stride, bool
 }  // namespace
-void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+void JitterHistory::register_metrics(const vector<pair<string, string>> &labels)
-       global_metrics.add("input_queue_length_frames", labels, &metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
-       global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
-       global_metrics.add("input_queue_duped_frames", labels, &metric_input_duped_frames);
+       global_metrics.add("input_underestimated_jitter_frames", labels, &metric_input_underestimated_jitter_frames);
+       global_metrics.add("input_estimated_max_jitter_seconds", labels, &metric_input_estimated_max_jitter_seconds, Metrics::TYPE_GAUGE);
+void JitterHistory::unregister_metrics(const vector<pair<string, string>> &labels)
+       global_metrics.remove("input_underestimated_jitter_frames", labels);
+       global_metrics.remove("input_estimated_max_jitter_seconds", labels);
-void QueueLengthPolicy::update_policy(unsigned queue_length)
+void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames)
-       if (queue_length == 0) {  // Starvation.
-               if (been_at_safe_point_since_last_starvation && safe_queue_length < unsigned(global_flags.max_input_queue_frames)) {
-                       ++safe_queue_length;
-                       fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n",
-                               card_index, safe_queue_length);
+       if (expected_timestamp > steady_clock::time_point::min()) {
+               expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+               double jitter_seconds = fabs(duration<double>(expected_timestamp - now).count());
+               history.push_back(orders.insert(jitter_seconds));
+               if (jitter_seconds > estimate_max_jitter()) {
+                       ++metric_input_underestimated_jitter_frames;
-               frames_with_at_least_one = 0;
-               been_at_safe_point_since_last_starvation = false;
-               ++metric_input_duped_frames;
-               metric_input_queue_safe_length_frames = safe_queue_length;
-               metric_input_queue_length_frames = 0;
-               return;
+               metric_input_estimated_max_jitter_seconds = estimate_max_jitter();
+               if (history.size() > history_length) {
+                       orders.erase(history.front());
+                       history.pop_front();
+               }
+               assert(history.size() <= history_length);
+       }
+       expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+double JitterHistory::estimate_max_jitter() const
+       if (orders.empty()) {
+               return 0.0;
-       if (queue_length >= safe_queue_length) {
-               been_at_safe_point_since_last_starvation = true;
+       size_t elem_idx = lrint((orders.size() - 1) * percentile);
+       if (percentile <= 0.5) {
+               return *next(orders.begin(), elem_idx) * multiplier;
+       } else {
+               return *prev(orders.end(), elem_idx + 1) * multiplier;
-       if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) {
-               --safe_queue_length;
-               metric_input_queue_safe_length_frames = safe_queue_length;
-               fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n",
-                       card_index, safe_queue_length);
-               frames_with_at_least_one = 0;
+void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+       global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
+void QueueLengthPolicy::unregister_metrics(const vector<pair<string, string>> &labels)
+       global_metrics.remove("input_queue_safe_length_frames", labels);
+void QueueLengthPolicy::update_policy(steady_clock::time_point now,
+                                      steady_clock::time_point expected_next_frame,
+                                      int64_t master_frame_duration,
+                                      double max_input_card_jitter_seconds,
+                                      double max_master_card_jitter_seconds)
+       double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE);
+       // Figure out when we can expect the next frame for this card, assuming
+       // worst-case jitter (ie., the frame is maximally late).
+       double seconds_until_next_frame = max(duration<double>(expected_next_frame - now).count() + max_input_card_jitter_seconds, 0.0);
+       // How many times are the master card expected to tick in that time?
+       // We assume the master clock has worst-case jitter but not any rate
+       // discrepancy, ie., it ticks as early as possible every time, but not
+       // cumulatively.
+       double frames_needed = (seconds_until_next_frame + max_master_card_jitter_seconds) / master_frame_duration_seconds;
+       // As a special case, if the master card ticks faster than the input card,
+       // we expect the queue to drain by itself even without dropping. But if
+       // the difference is small (e.g. 60 Hz master and 59.94 input), it would
+       // go slowly enough that the effect wouldn't really be appreciable.
+       // We account for this by looking at the situation five frames ahead,
+       // assuming everything else is the same.
+       double frames_allowed;
+       if (max_master_card_jitter_seconds < max_input_card_jitter_seconds) {
+               frames_allowed = frames_needed + 5 * (max_input_card_jitter_seconds - max_master_card_jitter_seconds) / master_frame_duration_seconds;
+       } else {
+               frames_allowed = frames_needed;
-       metric_input_queue_length_frames = min(queue_length, safe_queue_length);  // The caller will drop frames for us if needed.
+       safe_queue_length = max<int>(floor(frames_allowed), 0);
+       metric_input_queue_safe_length_frames = safe_queue_length;
 Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
@@ -404,6 +460,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
        metric_start_time_seconds = get_timestamp_for_metrics();
+       output_jitter_history.register_metrics({{ "card", "output" }});
        global_metrics.add("frames_output_total", &metric_frames_output_total);
        global_metrics.add("frames_output_dropped", &metric_frames_output_dropped);
        global_metrics.add("start_time_seconds", &metric_start_time_seconds, Metrics::TYPE_GAUGE);
@@ -478,10 +535,14 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT
        // Unregister old metrics, if any.
        if (!card->labels.empty()) {
                const vector<pair<string, string>> &labels = card->labels;
+               card->jitter_history.unregister_metrics(labels);
+               card->queue_length_policy.unregister_metrics(labels);
                global_metrics.remove("input_received_frames", labels);
                global_metrics.remove("input_dropped_frames_jitter", labels);
                global_metrics.remove("input_dropped_frames_error", labels);
                global_metrics.remove("input_dropped_frames_resets", labels);
+               global_metrics.remove("input_queue_length_frames", labels);
+               global_metrics.remove("input_queue_duped_frames", labels);
                global_metrics.remove("input_has_signal_bool", labels);
                global_metrics.remove("input_is_connected_bool", labels);
@@ -512,11 +573,14 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT
+       card->jitter_history.register_metrics(labels);
        global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames);
        global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter);
        global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error);
        global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets);
+       global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames);
        global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE);
        global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE);
@@ -547,7 +611,7 @@ void Mixer::set_output_card_internal(int card_index)
-               old_card->capture = move(old_card->parked_capture);
+               old_card->capture = move(old_card->parked_capture);  // TODO: reset the metrics
                old_card->is_fake_capture = false;
@@ -569,6 +633,7 @@ void Mixer::set_output_card_internal(int card_index)
                card->output->start_output(desired_output_video_mode, pts_int);
        output_card_index = card_index;
+       output_jitter_history.clear();
 namespace {
@@ -673,6 +738,11 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode,
        card->last_timecode = timecode;
+       // Calculate jitter for this card here. We do it on arrival so that we
+       // make sure every frame counts, even the dropped ones -- and it will also
+       // make sure the jitter number is as recent as possible, should it change.
+       card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames);
        PBOFrameAllocator::Userdata *userdata = (PBOFrameAllocator::Userdata *)video_frame.userdata;
        size_t cbcr_width, cbcr_height, cbcr_offset, y_offset;
@@ -1034,7 +1104,7 @@ bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card
        return (card_index == master_card_index);
-void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
+void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length)
        // Count the number of frames in the queue, including any frames
        // we dropped. It's hard to know exactly how we should deal with
@@ -1046,18 +1116,17 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
        for (const CaptureCard::NewFrame &frame : card->new_frames) {
                queue_length += frame.dropped_frames + 1;
-       card->queue_length_policy.update_policy(queue_length);
        // If needed, drop frames until the queue is below the safe limit.
        // We prefer to drop from the head, because all else being equal,
        // we'd like more recent frames (less latency).
        unsigned dropped_frames = 0;
-       while (queue_length > card->queue_length_policy.get_safe_queue_length()) {
+       while (queue_length > safe_queue_length) {
                assert(queue_length > card->new_frames.front().dropped_frames);
                queue_length -= card->new_frames.front().dropped_frames;
-               if (queue_length <= card->queue_length_policy.get_safe_queue_length()) {
+               if (queue_length <= safe_queue_length) {
                        // No need to drop anything.
@@ -1069,6 +1138,7 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
        card->metric_input_dropped_frames_jitter += dropped_frames;
+       card->metric_input_queue_length_frames = queue_length;
 #if 0
        if (dropped_frames > 0) {
@@ -1107,23 +1177,11 @@ start:
                goto start;
-       if (!master_card_is_output) {
-               output_frame_info.frame_timestamp =
-                       cards[master_card_index].new_frames.front().received_timestamp;
-       }
        for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
                CaptureCard *card = &cards[card_index];
-               if (input_card_is_master_clock(card_index, master_card_index)) {
-                       // We don't use the queue length policy for the master card,
-                       // but we will if it stops being the master. Thus, clear out
-                       // the policy in case we switch in the future.
-                       card->queue_length_policy.reset(card_index);
-                       assert(!card->new_frames.empty());
+               if (card->new_frames.empty()) {  // Starvation.
+                       ++card->metric_input_duped_frames;
                } else {
-                       trim_queue(card, card_index);
-               }
-               if (!card->new_frames.empty()) {
                        new_frames[card_index] = move(card->new_frames.front());
                        has_new_frame[card_index] = true;
@@ -1132,10 +1190,31 @@ start:
        if (!master_card_is_output) {
+               output_frame_info.frame_timestamp = new_frames[master_card_index].received_timestamp;
                output_frame_info.dropped_frames = new_frames[master_card_index].dropped_frames;
                output_frame_info.frame_duration = new_frames[master_card_index].length;
+       if (!output_frame_info.is_preroll) {
+               output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames);
+       }
+       for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
+               CaptureCard *card = &cards[card_index];
+               if (has_new_frame[card_index] &&
+                   !input_card_is_master_clock(card_index, master_card_index) &&
+                   !output_frame_info.is_preroll) {
+                       card->queue_length_policy.update_policy(
+                               output_frame_info.frame_timestamp,
+                               card->jitter_history.get_expected_next_frame(),
+                               output_frame_info.frame_duration,
+                               card->jitter_history.estimate_max_jitter(),
+                               output_jitter_history.estimate_max_jitter());
+                       trim_queue(card, min<int>(global_flags.max_input_queue_frames,
+                                                 card->queue_length_policy.get_safe_queue_length()));
+               }
+       }
        // This might get off by a fractional sample when changing master card
        // between ones with different frame rates, but that's fine.
        int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples;
diff --git a/mixer.h b/mixer.h
index 9b14dc233a241ed6ed3b30e82323c96dc14cbf45..ace60701f5adda9af5fa4422ce77eee2ebdebefc 100644 (file)
--- a/mixer.h
+++ b/mixer.h
@@ -54,6 +54,59 @@ class ResourcePool;
 class YCbCrInput;
 }  // namespace movit
+// A class to estimate the future jitter. Used in QueueLengthPolicy (see below).
+// There are many ways to estimate jitter; I've tested a few ones (and also
+// some algorithms that don't explicitly model jitter) with different
+// parameters on some real-life data in experiments/queue_drop_policy.cpp.
+// This is one based on simple order statistics where I've added some margin in
+// the number of starvation events; I believe that about one every hour would
+// probably be acceptable, but this one typically goes lower than that, at the
+// cost of 2–3 ms extra latency. (If the queue is hard-limited to one frame, it's
+// possible to get ~10 ms further down, but this would mean framedrops every
+// second or so.) The general strategy is: Take the 99.9-percentile jitter over
+// last 5000 frames, multiply by two, and that's our worst-case jitter
+// estimate. The fact that we're not using the max value means that we could
+// actually even throw away very late frames immediately, which means we only
+// get one user-visible event instead of seeing something both when the frame
+// arrives late (duplicate frame) and then again when we drop.
+class JitterHistory {
+       static constexpr size_t history_length = 5000;
+       static constexpr double percentile = 0.999;
+       static constexpr double multiplier = 2.0;
+       void register_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
+       void unregister_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
+       void clear() {
+               history.clear();
+               orders.clear();
+       }
+       void frame_arrived(std::chrono::steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames);
+       std::chrono::steady_clock::time_point get_expected_next_frame() const { return expected_timestamp; }
+       double estimate_max_jitter() const;
+       // A simple O(k) based algorithm for getting the k-th largest or
+       // smallest element from our window; we simply keep the multiset
+       // ordered (insertions and deletions are O(n) as always) and then
+       // iterate from one of the sides. If we had larger values of k,
+       // we could go for a more complicated setup with two sets or heaps
+       // (one increasing and one decreasing) that we keep balanced around
+       // the point, or it is possible to reimplement std::set with
+       // counts in each node. However, since k=5, we don't need this.
+       std::multiset<double> orders;
+       std::deque<std::multiset<double>::iterator> history;
+       std::chrono::steady_clock::time_point expected_timestamp = std::chrono::steady_clock::time_point::min();
+       // Metrics. There are no direct summaries for jitter, since we already have latency summaries.
+       std::atomic<int64_t> metric_input_underestimated_jitter_frames{0};
+       std::atomic<double> metric_input_estimated_max_jitter_seconds{0.0 / 0.0};
 // For any card that's not the master (where we pick out the frames as they
 // come, as fast as we can process), there's going to be a queue. The question
 // is when we should drop frames from that queue (apart from the obvious
@@ -65,44 +118,38 @@ class YCbCrInput;
 //   2. We don't want to add more delay than is needed.
 // Our general strategy is to drop as many frames as we can (helping for #2)
-// that we think is safe for #1 given jitter. To this end, we set a lower floor N,
-// where we assume that if we have N frames in the queue, we're always safe from
-// starvation. (Typically, N will be 0 or 1. It starts off at 0.) If we have
-// more than N frames in the queue after reading out the one we need, we head-drop
-// them to reduce the queue.
+// that we think is safe for #1 given jitter. To this end, we measure the
+// deviation from the expected arrival time for all cards, and use that for
+// continuous jitter estimation.
-// N is reduced as follows: If the queue has had at least one spare frame for
-// at least 50 (master) frames (ie., it's been too conservative for a second),
-// we reduce N by 1 and reset the timers.
-// Whenever the queue is starved (we needed a frame but there was none),
-// and we've been at N since the last starvation, N was obviously too low,
-// so we increment it. We will never set N above 5, though.
+// We then drop everything from the queue that we're sure we won't need to
+// serve the output in the time before the next frame arrives. Typically,
+// this means the queue will contain 0 or 1 frames, although more is also
+// possible if the jitter is very high.
 class QueueLengthPolicy {
        QueueLengthPolicy() {}
        void reset(unsigned card_index) {
                this->card_index = card_index;
-               safe_queue_length = 1;
-               frames_with_at_least_one = 0;
-               been_at_safe_point_since_last_starvation = false;
        void register_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
-       void update_policy(unsigned queue_length);  // Call before picking out a frame, so 0 means starvation.
+       void unregister_metrics(const std::vector<std::pair<std::string, std::string>> &labels);
+       // Call after picking out a frame, so 0 means starvation.
+       void update_policy(std::chrono::steady_clock::time_point now,
+                          std::chrono::steady_clock::time_point expected_next_frame,
+                          int64_t master_frame_duration,
+                          double max_input_card_jitter_seconds,
+                          double max_master_card_jitter_seconds);
        unsigned get_safe_queue_length() const { return safe_queue_length; }
-       unsigned card_index;  // For debugging only.
-       unsigned safe_queue_length = 1;  // Called N in the comments. Can never go below 1.
-       unsigned frames_with_at_least_one = 0;
-       bool been_at_safe_point_since_last_starvation = false;
+       unsigned card_index;  // For debugging and metrics only.
+       unsigned safe_queue_length = 0;  // Can never go below zero.
        // Metrics.
-       std::atomic<int64_t> metric_input_queue_length_frames{0};
        std::atomic<int64_t> metric_input_queue_safe_length_frames{1};
-       std::atomic<int64_t> metric_input_duped_frames{0};
 class Mixer {
@@ -375,9 +422,7 @@ private:
        void audio_thread_func();
        void release_display_frame(DisplayFrame *frame);
        double pts() { return double(pts_int) / TIMEBASE; }
-       // Call this _before_ trying to pull out a frame from a capture card;
-       // it will update the policy and drop the right amount of frames for you.
-       void trim_queue(CaptureCard *card, unsigned card_index);
+       void trim_queue(CaptureCard *card, size_t safe_queue_length);
        HTTPD httpd;
        unsigned num_cards, num_video_inputs;
@@ -454,12 +499,16 @@ private:
                int last_timecode = -1;  // Unwrapped.
+               JitterHistory jitter_history;
                // Metrics.
                std::vector<std::pair<std::string, std::string>> labels;
                std::atomic<int64_t> metric_input_received_frames{0};
+               std::atomic<int64_t> metric_input_duped_frames{0};
                std::atomic<int64_t> metric_input_dropped_frames_jitter{0};
                std::atomic<int64_t> metric_input_dropped_frames_error{0};
                std::atomic<int64_t> metric_input_resets{0};
+               std::atomic<int64_t> metric_input_queue_length_frames{0};
                std::atomic<int64_t> metric_input_has_signal_bool{-1};
                std::atomic<int64_t> metric_input_is_connected_bool{-1};
@@ -470,6 +519,7 @@ private:
                std::atomic<int64_t> metric_input_frame_rate_den{-1};
                std::atomic<int64_t> metric_input_sample_rate_hz{-1};
+       JitterHistory output_jitter_history;
        CaptureCard cards[MAX_VIDEO_CARDS];  // Protected by <card_mutex>.
        YCbCrInterpretation ycbcr_interpretation[MAX_VIDEO_CARDS];  // Protected by <card_mutex>.
        AudioMixer audio_mixer;  // Same as global_audio_mixer (see audio_mixer.h).