]> git.sesse.net Git - nageru/blobdiff - mixer.cpp
Give the UI thread a name.
[nageru] / mixer.cpp
index 261c8eec810ce27cf7d67fb2cf89d4b1f5c29923..31277f1cd87af1bf2e03a67a1f169dcdf8f8c03a 100644 (file)
--- a/mixer.cpp
+++ b/mixer.cpp
@@ -42,6 +42,7 @@
 #include "ffmpeg_capture.h"
 #include "flags.h"
 #include "input_mapping.h"
+#include "metrics.h"
 #include "pbo_frame_allocator.h"
 #include "ref_counted_gl_sync.h"
 #include "resampling_queue.h"
@@ -197,38 +198,106 @@ void upload_texture(GLuint tex, GLuint width, GLuint height, GLuint stride, bool
 
 }  // namespace
 
-void QueueLengthPolicy::update_policy(unsigned queue_length)
+void JitterHistory::register_metrics(const vector<pair<string, string>> &labels)
 {
-       if (queue_length == 0) {  // Starvation.
-               if (been_at_safe_point_since_last_starvation && safe_queue_length < unsigned(global_flags.max_input_queue_frames)) {
-                       ++safe_queue_length;
-                       fprintf(stderr, "Card %u: Starvation, increasing safe limit to %u frame(s)\n",
-                               card_index, safe_queue_length);
+       global_metrics.add("input_underestimated_jitter_frames", labels, &metric_input_underestimated_jitter_frames);
+       global_metrics.add("input_estimated_max_jitter_seconds", labels, &metric_input_estimated_max_jitter_seconds, Metrics::TYPE_GAUGE);
+}
+
+void JitterHistory::unregister_metrics(const vector<pair<string, string>> &labels)
+{
+       global_metrics.remove("input_underestimated_jitter_frames", labels);
+       global_metrics.remove("input_estimated_max_jitter_seconds", labels);
+}
+
+void JitterHistory::frame_arrived(steady_clock::time_point now, int64_t frame_duration, size_t dropped_frames)
+{
+       if (expected_timestamp > steady_clock::time_point::min()) {
+               expected_timestamp += dropped_frames * nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+               double jitter_seconds = fabs(duration<double>(expected_timestamp - now).count());
+               history.push_back(orders.insert(jitter_seconds));
+               if (jitter_seconds > estimate_max_jitter()) {
+                       ++metric_input_underestimated_jitter_frames;
                }
-               frames_with_at_least_one = 0;
-               been_at_safe_point_since_last_starvation = false;
-               return;
+
+               metric_input_estimated_max_jitter_seconds = estimate_max_jitter();
+
+               if (history.size() > history_length) {
+                       orders.erase(history.front());
+                       history.pop_front();
+               }
+               assert(history.size() <= history_length);
        }
-       if (queue_length >= safe_queue_length) {
-               been_at_safe_point_since_last_starvation = true;
+       expected_timestamp = now + nanoseconds(frame_duration * 1000000000 / TIMEBASE);
+}
+
+double JitterHistory::estimate_max_jitter() const
+{
+       if (orders.empty()) {
+               return 0.0;
        }
-       if (++frames_with_at_least_one >= 1000 && safe_queue_length > 1) {
-               --safe_queue_length;
-               fprintf(stderr, "Card %u: Spare frames for more than 1000 frames, reducing safe limit to %u frame(s)\n",
-                       card_index, safe_queue_length);
-               frames_with_at_least_one = 0;
+       size_t elem_idx = lrint((orders.size() - 1) * percentile);
+       if (percentile <= 0.5) {
+               return *next(orders.begin(), elem_idx) * multiplier;
+       } else {
+               return *prev(orders.end(), elem_idx + 1) * multiplier;
        }
 }
 
+void QueueLengthPolicy::register_metrics(const vector<pair<string, string>> &labels)
+{
+       global_metrics.add("input_queue_safe_length_frames", labels, &metric_input_queue_safe_length_frames, Metrics::TYPE_GAUGE);
+}
+
+void QueueLengthPolicy::unregister_metrics(const vector<pair<string, string>> &labels)
+{
+       global_metrics.remove("input_queue_safe_length_frames", labels);
+}
+
+void QueueLengthPolicy::update_policy(steady_clock::time_point now,
+                                      steady_clock::time_point expected_next_frame,
+                                      int64_t master_frame_duration,
+                                      double max_input_card_jitter_seconds,
+                                      double max_master_card_jitter_seconds)
+{
+       double master_frame_duration_seconds = master_frame_duration / double(TIMEBASE);
+
+       // Figure out when we can expect the next frame for this card, assuming
+       // worst-case jitter (ie., the frame is maximally late).
+       double seconds_until_next_frame = max(duration<double>(expected_next_frame - now).count() + max_input_card_jitter_seconds, 0.0);
+
+       // How many times are the master card expected to tick in that time?
+       // We assume the master clock has worst-case jitter but not any rate
+       // discrepancy, ie., it ticks as early as possible every time, but not
+       // cumulatively.
+       double frames_needed = (seconds_until_next_frame + max_master_card_jitter_seconds) / master_frame_duration_seconds;
+
+       // As a special case, if the master card ticks faster than the input card,
+       // we expect the queue to drain by itself even without dropping. But if
+       // the difference is small (e.g. 60 Hz master and 59.94 input), it would
+       // go slowly enough that the effect wouldn't really be appreciable.
+       // We account for this by looking at the situation five frames ahead,
+       // assuming everything else is the same.
+       double frames_allowed;
+       if (max_master_card_jitter_seconds < max_input_card_jitter_seconds) {
+               frames_allowed = frames_needed + 5 * (max_input_card_jitter_seconds - max_master_card_jitter_seconds) / master_frame_duration_seconds;
+       } else {
+               frames_allowed = frames_needed;
+       }
+
+       safe_queue_length = max<int>(floor(frames_allowed), 0);
+       metric_input_queue_safe_length_frames = safe_queue_length;
+}
+
 Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
        : httpd(),
          num_cards(num_cards),
          mixer_surface(create_surface(format)),
          h264_encoder_surface(create_surface(format)),
          decklink_output_surface(create_surface(format)),
-         ycbcr_interpretation(global_flags.ycbcr_interpretation),
          audio_mixer(num_cards)
 {
+       memcpy(ycbcr_interpretation, global_flags.ycbcr_interpretation, sizeof(ycbcr_interpretation));
        CHECK(init_movit(MOVIT_SHADER_DIR, MOVIT_DEBUG_OFF));
        check_error();
 
@@ -388,6 +457,15 @@ Mixer::Mixer(const QSurfaceFormat &format, unsigned num_cards)
                desired_output_card_index = global_flags.output_card;
                set_output_card_internal(global_flags.output_card);
        }
+
+       metric_start_time_seconds = get_timestamp_for_metrics();
+
+       output_jitter_history.register_metrics({{ "card", "output" }});
+       global_metrics.add("frames_output_total", &metric_frames_output_total);
+       global_metrics.add("frames_output_dropped", &metric_frames_output_dropped);
+       global_metrics.add("start_time_seconds", &metric_start_time_seconds, Metrics::TYPE_GAUGE);
+       global_metrics.add("memory_used_bytes", &metrics_memory_used_bytes);
+       global_metrics.add("memory_locked_limit_bytes", &metrics_memory_locked_limit_bytes);
 }
 
 Mixer::~Mixer()
@@ -427,7 +505,7 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT
 
        PixelFormat pixel_format;
        if (card_type == CardType::FFMPEG_INPUT) {
-               pixel_format = PixelFormat_8BitBGRA;
+               pixel_format = capture->get_current_pixel_format();
        } else if (global_flags.ten_bit_input) {
                pixel_format = PixelFormat_10BitYCbCr;
        } else {
@@ -453,6 +531,66 @@ void Mixer::configure_card(unsigned card_index, CaptureInterface *capture, CardT
        audio_mixer.reset_resampler(device);
        audio_mixer.set_display_name(device, card->capture->get_description());
        audio_mixer.trigger_state_changed_callback();
+
+       // Unregister old metrics, if any.
+       if (!card->labels.empty()) {
+               const vector<pair<string, string>> &labels = card->labels;
+               card->jitter_history.unregister_metrics(labels);
+               card->queue_length_policy.unregister_metrics(labels);
+               global_metrics.remove("input_received_frames", labels);
+               global_metrics.remove("input_dropped_frames_jitter", labels);
+               global_metrics.remove("input_dropped_frames_error", labels);
+               global_metrics.remove("input_dropped_frames_resets", labels);
+               global_metrics.remove("input_queue_length_frames", labels);
+               global_metrics.remove("input_queue_duped_frames", labels);
+
+               global_metrics.remove("input_has_signal_bool", labels);
+               global_metrics.remove("input_is_connected_bool", labels);
+               global_metrics.remove("input_interlaced_bool", labels);
+               global_metrics.remove("input_width_pixels", labels);
+               global_metrics.remove("input_height_pixels", labels);
+               global_metrics.remove("input_frame_rate_nom", labels);
+               global_metrics.remove("input_frame_rate_den", labels);
+               global_metrics.remove("input_sample_rate_hz", labels);
+       }
+
+       // Register metrics.
+       vector<pair<string, string>> labels;
+       char card_name[64];
+       snprintf(card_name, sizeof(card_name), "%d", card_index);
+       labels.emplace_back("card", card_name);
+
+       switch (card_type) {
+       case CardType::LIVE_CARD:
+               labels.emplace_back("cardtype", "live");
+               break;
+       case CardType::FAKE_CAPTURE:
+               labels.emplace_back("cardtype", "fake");
+               break;
+       case CardType::FFMPEG_INPUT:
+               labels.emplace_back("cardtype", "ffmpeg");
+               break;
+       default:
+               assert(false);
+       }
+       card->jitter_history.register_metrics(labels);
+       card->queue_length_policy.register_metrics(labels);
+       global_metrics.add("input_received_frames", labels, &card->metric_input_received_frames);
+       global_metrics.add("input_dropped_frames_jitter", labels, &card->metric_input_dropped_frames_jitter);
+       global_metrics.add("input_dropped_frames_error", labels, &card->metric_input_dropped_frames_error);
+       global_metrics.add("input_dropped_frames_resets", labels, &card->metric_input_resets);
+       global_metrics.add("input_queue_length_frames", labels, &card->metric_input_queue_length_frames, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_queue_duped_frames", labels, &card->metric_input_duped_frames);
+
+       global_metrics.add("input_has_signal_bool", labels, &card->metric_input_has_signal_bool, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_is_connected_bool", labels, &card->metric_input_is_connected_bool, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_interlaced_bool", labels, &card->metric_input_interlaced_bool, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_width_pixels", labels, &card->metric_input_width_pixels, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_height_pixels", labels, &card->metric_input_height_pixels, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_frame_rate_nom", labels, &card->metric_input_frame_rate_nom, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_frame_rate_den", labels, &card->metric_input_frame_rate_den, Metrics::TYPE_GAUGE);
+       global_metrics.add("input_sample_rate_hz", labels, &card->metric_input_sample_rate_hz, Metrics::TYPE_GAUGE);
+       card->labels = labels;
 }
 
 void Mixer::set_output_card_internal(int card_index)
@@ -473,7 +611,7 @@ void Mixer::set_output_card_internal(int card_index)
                lock.unlock();
                fake_capture->stop_dequeue_thread();
                lock.lock();
-               old_card->capture = move(old_card->parked_capture);
+               old_card->capture = move(old_card->parked_capture);  // TODO: reset the metrics
                old_card->is_fake_capture = false;
                old_card->capture->start_bm_capture();
        }
@@ -495,6 +633,7 @@ void Mixer::set_output_card_internal(int card_index)
                card->output->start_output(desired_output_video_mode, pts_int);
        }
        output_card_index = card_index;
+       output_jitter_history.clear();
 }
 
 namespace {
@@ -518,6 +657,16 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode,
        DeviceSpec device{InputSourceType::CAPTURE_CARD, card_index};
        CaptureCard *card = &cards[card_index];
 
+       ++card->metric_input_received_frames;
+       card->metric_input_has_signal_bool = video_format.has_signal;
+       card->metric_input_is_connected_bool = video_format.is_connected;
+       card->metric_input_interlaced_bool = video_format.interlaced;
+       card->metric_input_width_pixels = video_format.width;
+       card->metric_input_height_pixels = video_format.height;
+       card->metric_input_frame_rate_nom = video_format.frame_rate_nom;
+       card->metric_input_frame_rate_den = video_format.frame_rate_den;
+       card->metric_input_sample_rate_hz = audio_format.sample_rate;
+
        if (is_mode_scanning[card_index]) {
                if (video_format.has_signal) {
                        // Found a stable signal, so stop scanning.
@@ -567,10 +716,12 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode,
                        card_index, card->last_timecode, timecode);
                audio_mixer.reset_resampler(device);
                dropped_frames = 0;
+               ++card->metric_input_resets;
        } else if (dropped_frames > 0) {
                // Insert silence as needed.
                fprintf(stderr, "Card %d dropped %d frame(s) (before timecode 0x%04x), inserting silence.\n",
                        card_index, dropped_frames, timecode);
+               card->metric_input_dropped_frames_error += dropped_frames;
 
                bool success;
                do {
@@ -587,6 +738,11 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode,
 
        card->last_timecode = timecode;
 
+       // Calculate jitter for this card here. We do it on arrival so that we
+       // make sure every frame counts, even the dropped ones -- and it will also
+       // make sure the jitter number is as recent as possible, should it change.
+       card->jitter_history.frame_arrived(video_frame.received_timestamp, frame_length, dropped_frames);
+
        PBOFrameAllocator::Userdata *userdata = (PBOFrameAllocator::Userdata *)video_frame.userdata;
 
        size_t cbcr_width, cbcr_height, cbcr_offset, y_offset;
@@ -715,6 +871,13 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode,
                        case PixelFormat_8BitBGRA: {
                                size_t field_start = video_offset + video_format.stride * field_start_line;
                                upload_texture(userdata->tex_rgba[field], video_format.width, video_format.height, video_format.stride, interlaced_stride, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, field_start);
+                               // These could be asked to deliver mipmaps at any time.
+                               glBindTexture(GL_TEXTURE_2D, userdata->tex_rgba[field]);
+                               check_error();
+                               glGenerateMipmap(GL_TEXTURE_2D);
+                               check_error();
+                               glBindTexture(GL_TEXTURE_2D, 0);
+                               check_error();
                                break;
                        }
                        default:
@@ -864,6 +1027,10 @@ void Mixer::thread_func()
 
                now = steady_clock::now();
                double elapsed = duration<double>(now - start).count();
+
+               metric_frames_output_total = frame_num;
+               metric_frames_output_dropped = stats_dropped_frames;
+
                if (frame_num % 100 == 0) {
                        printf("%d frames (%d dropped) in %.3f seconds = %.1f fps (%.1f ms/frame)",
                                frame_num, stats_dropped_frames, elapsed, frame_num / elapsed,
@@ -894,12 +1061,16 @@ void Mixer::thread_func()
                                                long(limit.rlim_cur / 1048576),
                                                float(100.0 * (used.ru_maxrss * 1024.0) / limit.rlim_cur));
                                }
+                               metrics_memory_locked_limit_bytes = limit.rlim_cur;
                        } else {
                                printf(", using %ld MB memory (not locked)",
                                        long(used.ru_maxrss / 1024));
+                               metrics_memory_locked_limit_bytes = 0.0 / 0.0;
                        }
 
                        printf("\n");
+
+                       metrics_memory_used_bytes = used.ru_maxrss * 1024;
                }
 
 
@@ -933,7 +1104,7 @@ bool Mixer::input_card_is_master_clock(unsigned card_index, unsigned master_card
        return (card_index == master_card_index);
 }
 
-void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
+void Mixer::trim_queue(CaptureCard *card, size_t safe_queue_length)
 {
        // Count the number of frames in the queue, including any frames
        // we dropped. It's hard to know exactly how we should deal with
@@ -945,18 +1116,17 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
        for (const CaptureCard::NewFrame &frame : card->new_frames) {
                queue_length += frame.dropped_frames + 1;
        }
-       card->queue_length_policy.update_policy(queue_length);
 
        // If needed, drop frames until the queue is below the safe limit.
        // We prefer to drop from the head, because all else being equal,
        // we'd like more recent frames (less latency).
        unsigned dropped_frames = 0;
-       while (queue_length > card->queue_length_policy.get_safe_queue_length()) {
+       while (queue_length > safe_queue_length) {
                assert(!card->new_frames.empty());
                assert(queue_length > card->new_frames.front().dropped_frames);
                queue_length -= card->new_frames.front().dropped_frames;
 
-               if (queue_length <= card->queue_length_policy.get_safe_queue_length()) {
+               if (queue_length <= safe_queue_length) {
                        // No need to drop anything.
                        break;
                }
@@ -967,6 +1137,9 @@ void Mixer::trim_queue(CaptureCard *card, unsigned card_index)
                ++dropped_frames;
        }
 
+       card->metric_input_dropped_frames_jitter += dropped_frames;
+       card->metric_input_queue_length_frames = queue_length;
+
 #if 0
        if (dropped_frames > 0) {
                fprintf(stderr, "Card %u dropped %u frame(s) to keep latency down.\n",
@@ -1000,26 +1173,15 @@ start:
                // and then restart.
                assert(cards[master_card_index].capture->get_disconnected());
                handle_hotplugged_cards();
+               lock.unlock();
                goto start;
        }
 
-       if (!master_card_is_output) {
-               output_frame_info.frame_timestamp =
-                       cards[master_card_index].new_frames.front().received_timestamp;
-       }
-
        for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
                CaptureCard *card = &cards[card_index];
-               if (input_card_is_master_clock(card_index, master_card_index)) {
-                       // We don't use the queue length policy for the master card,
-                       // but we will if it stops being the master. Thus, clear out
-                       // the policy in case we switch in the future.
-                       card->queue_length_policy.reset(card_index);
-                       assert(!card->new_frames.empty());
+               if (card->new_frames.empty()) {  // Starvation.
+                       ++card->metric_input_duped_frames;
                } else {
-                       trim_queue(card, card_index);
-               }
-               if (!card->new_frames.empty()) {
                        new_frames[card_index] = move(card->new_frames.front());
                        has_new_frame[card_index] = true;
                        card->new_frames.pop_front();
@@ -1028,10 +1190,31 @@ start:
        }
 
        if (!master_card_is_output) {
+               output_frame_info.frame_timestamp = new_frames[master_card_index].received_timestamp;
                output_frame_info.dropped_frames = new_frames[master_card_index].dropped_frames;
                output_frame_info.frame_duration = new_frames[master_card_index].length;
        }
 
+       if (!output_frame_info.is_preroll) {
+               output_jitter_history.frame_arrived(output_frame_info.frame_timestamp, output_frame_info.frame_duration, output_frame_info.dropped_frames);
+       }
+
+       for (unsigned card_index = 0; card_index < num_cards + num_video_inputs; ++card_index) {
+               CaptureCard *card = &cards[card_index];
+               if (has_new_frame[card_index] &&
+                   !input_card_is_master_clock(card_index, master_card_index) &&
+                   !output_frame_info.is_preroll) {
+                       card->queue_length_policy.update_policy(
+                               output_frame_info.frame_timestamp,
+                               card->jitter_history.get_expected_next_frame(),
+                               output_frame_info.frame_duration,
+                               card->jitter_history.estimate_max_jitter(),
+                               output_jitter_history.estimate_max_jitter());
+                       trim_queue(card, min<int>(global_flags.max_input_queue_frames,
+                                                 card->queue_length_policy.get_safe_queue_length()));
+               }
+       }
+
        // This might get off by a fractional sample when changing master card
        // between ones with different frame rates, but that's fine.
        int num_samples_times_timebase = OUTPUT_FREQUENCY * output_frame_info.frame_duration + fractional_samples;
@@ -1151,7 +1334,7 @@ void Mixer::render_one_frame(int64_t duration)
        // The theme can't (or at least shouldn't!) call connect_signal() on
        // each FFmpeg input, so we'll do it here.
        for (const pair<LiveInputWrapper *, FFmpegCapture *> &conn : theme->get_signal_connections()) {
-               conn.first->connect_signal_raw(conn.second->get_card_index());
+               conn.first->connect_signal_raw(conn.second->get_card_index(), input_state);
        }
 
        // If HDMI/SDI output is active and the user has requested auto mode,