]> git.sesse.net Git - nageru/blobdiff - mixer.cpp
Add a system for queue length policies, so that we have as little as possible (but...
[nageru] / mixer.cpp
index 104a6c03a22278b9b9ddfde7e8eb7edc4f9ad96e..52d285ce632e9e922cd48337cdcd91a2c1a2e4ef 100644 (file)
--- a/mixer.cpp
+++ b/mixer.cpp
@@ -111,6 +111,29 @@ string generate_local_dump_filename(int frame)
 
 }  // namespace
 
+void QueueLengthPolicy::update_policy(int queue_length)
+{
+       if (queue_length < 0) {  // Starvation.
+               if (safe_queue_length < 5) {
+                       ++safe_queue_length;
+                       fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frames\n",
+                               card_index, safe_queue_length);
+               }
+               frames_with_at_least_one = 0;
+               return;
+       }
+       if (queue_length > 0) {
+               if (++frames_with_at_least_one >= 50) {
+                       --safe_queue_length;
+                       fprintf(stderr, "Card %u: Spare frames for more than 50 frames, reducing safe limit to %u frames\n",
+                               card_index, safe_queue_length);
+                       frames_with_at_least_one = 0;
+               }
+       } else {
+               frames_with_at_least_one = 0;
+       }
+}
+
 Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
        : httpd(WIDTH, HEIGHT),
          num_cards(num_cards),
@@ -182,6 +205,7 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
        }
 
        for (card_index = 0; card_index < num_cards; ++card_index) {
+               cards[card_index].queue_length_policy.reset(card_index);
                cards[card_index].capture->start_bm_capture();
        }
 
@@ -593,6 +617,8 @@ void Mixer::thread_func()
                        for (unsigned card_index = 0; card_index < num_cards; ++card_index) {
                                CaptureCard *card = &cards[card_index];
                                if (card->new_frames.empty()) {
+                                       assert(card_index != master_card_index);
+                                       card->queue_length_policy.update_policy(-1);
                                        continue;
                                }
                                new_frames[card_index] = move(card->new_frames.front());
@@ -604,6 +630,15 @@ void Mixer::thread_func()
                                num_samples[card_index] = num_samples_times_timebase / TIMEBASE;
                                card->fractional_samples = num_samples_times_timebase % TIMEBASE;
                                assert(num_samples[card_index] >= 0);
+
+                               if (card_index != master_card_index) {
+                                       // If we have excess frames compared to the policy for this card,
+                                       // drop frames from the head.
+                                       card->queue_length_policy.update_policy(card->new_frames.size());
+                                       while (card->new_frames.size() > card->queue_length_policy.get_safe_queue_length()) {
+                                               card->new_frames.pop();
+                                       }
+                               }
                        }
                }