]> git.sesse.net Git - nageru/commitdiff
Write video to disk on a background thread; reduces mutex contention a fair bit when...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 29 Jun 2017 19:20:03 +0000 (21:20 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 29 Jun 2017 19:22:25 +0000 (21:22 +0200)
mux.cpp
mux.h
quicksync_encoder.cpp
video_encoder.cpp

diff --git a/mux.cpp b/mux.cpp
index d01c18038a893cfec7152d35d8e3b0f1131ed0fb..5c6a150edd80e80e1e2825e6ab1d012f1a33253f 100644 (file)
--- a/mux.cpp
+++ b/mux.cpp
@@ -31,7 +31,9 @@ using namespace std;
 struct PacketBefore {
        PacketBefore(const AVFormatContext *ctx) : ctx(ctx) {}
 
-       bool operator() (const AVPacket *a, const AVPacket *b) const {
+       bool operator() (const Mux::QueuedPacket &a_qp, const Mux::QueuedPacket &b_qp) const {
+               const AVPacket *a = a_qp.pkt;
+               const AVPacket *b = b_qp.pkt;
                int64_t a_dts = (a->dts == AV_NOPTS_VALUE ? a->pts : a->dts);
                int64_t b_dts = (b->dts == AV_NOPTS_VALUE ? b->pts : b->dts);
                AVRational a_timebase = ctx->streams[a->stream_index]->time_base;
@@ -46,8 +48,8 @@ struct PacketBefore {
        const AVFormatContext * const ctx;
 };
 
-Mux::Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, const vector<MuxMetrics *> &metrics)
-       : avctx(avctx), write_callback(write_callback), metrics(metrics)
+Mux::Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, WriteStrategy write_strategy, const vector<MuxMetrics *> &metrics)
+       : write_strategy(write_strategy), avctx(avctx), write_callback(write_callback), metrics(metrics)
 {
        avstream_video = avformat_new_stream(avctx, nullptr);
        if (avstream_video == nullptr) {
@@ -116,10 +118,20 @@ Mux::Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const
 
        // Make sure the header is written before the constructor exits.
        avio_flush(avctx->pb);
+
+       if (write_strategy == WRITE_BACKGROUND) {
+               writer_thread = thread(&Mux::thread_func, this);
+       }
 }
 
 Mux::~Mux()
 {
+       assert(plug_count == 0);
+       if (write_strategy == WRITE_BACKGROUND) {
+               writer_thread_should_quit = true;
+               packet_queue_ready.notify_all();
+               writer_thread.join();
+       }
        int64_t old_pos = avctx->pb->pos;
        av_write_trailer(avctx);
        for (MuxMetrics *metric : metrics) {
@@ -154,25 +166,20 @@ void Mux::add_packet(const AVPacket &pkt, int64_t pts, int64_t dts)
 
        {
                lock_guard<mutex> lock(mu);
-               if (plug_count > 0) {
-                       plugged_packets.push_back(av_packet_clone(&pkt_copy));
+               if (write_strategy == WriteStrategy::WRITE_BACKGROUND) {
+                       packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
+                       if (plug_count == 0) packet_queue_ready.notify_all();
+               } else if (plug_count > 0) {
+                       packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
                } else {
-                       write_packet_or_die(pkt_copy);
+                       write_packet_or_die(pkt_copy, pts);
                }
        }
 
        av_packet_unref(&pkt_copy);
-
-       // Note: This will be wrong in the case of plugged packets, but that only happens
-       // for network streams, not for files, and write callbacks are only really relevant
-       // for files. (We don't want to do this from write_packet_or_die, as it only has
-       // the rescaled pts, which is unsuitable for callback.)
-       if (pkt.stream_index == 0 && write_callback != nullptr) {
-               write_callback(pts);
-       }
 }
 
-void Mux::write_packet_or_die(const AVPacket &pkt)
+void Mux::write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts)
 {
        for (MuxMetrics *metric : metrics) {
                if (pkt.stream_index == 0) {
@@ -192,6 +199,10 @@ void Mux::write_packet_or_die(const AVPacket &pkt)
        for (MuxMetrics *metric : metrics) {
                metric->metric_written_bytes += avctx->pb->pos - old_pos;
        }
+
+       if (pkt.stream_index == 0 && write_callback != nullptr) {
+               write_callback(unscaled_pts);
+       }
 }
 
 void Mux::plug()
@@ -208,13 +219,42 @@ void Mux::unplug()
        }
        assert(plug_count >= 0);
 
-       sort(plugged_packets.begin(), plugged_packets.end(), PacketBefore(avctx));
+       sort(packet_queue.begin(), packet_queue.end(), PacketBefore(avctx));
 
-       for (AVPacket *pkt : plugged_packets) {
-               write_packet_or_die(*pkt);
-               av_packet_free(&pkt);
+       if (write_strategy == WRITE_BACKGROUND) {
+               packet_queue_ready.notify_all();
+       } else {
+               for (QueuedPacket &qp : packet_queue) {
+                       write_packet_or_die(*qp.pkt, qp.unscaled_pts);
+                       av_packet_free(&qp.pkt);
+               }
+               packet_queue.clear();
+       }
+}
+
+void Mux::thread_func()
+{
+       unique_lock<mutex> lock(mu);
+       for ( ;; ) {
+               packet_queue_ready.wait(lock, [this]() {
+                       return writer_thread_should_quit || (!packet_queue.empty() && plug_count == 0);
+               });
+               if (writer_thread_should_quit && packet_queue.empty()) {
+                       // All done.
+                       break;
+               }
+
+               assert(!packet_queue.empty() && plug_count == 0);
+               vector<QueuedPacket> packets;
+               swap(packets, packet_queue);
+
+               lock.unlock();
+               for (QueuedPacket &qp : packets) {
+                       write_packet_or_die(*qp.pkt, qp.unscaled_pts);
+                       av_packet_free(&qp.pkt);
+               }
+               lock.lock();
        }
-       plugged_packets.clear();
 }
 
 void MuxMetrics::init(const vector<pair<string, string>> &labels)
diff --git a/mux.h b/mux.h
index c6c442100c58b18c481754747693b54c8571bcbc..5bf3e413014e7f0de6caf1e992e2747dc573fe98 100644 (file)
--- a/mux.h
+++ b/mux.h
@@ -10,10 +10,12 @@ extern "C" {
 
 #include <sys/types.h>
 #include <atomic>
+#include <condition_variable>
 #include <functional>
 #include <mutex>
 #include <string>
 #include <utility>
+#include <thread>
 #include <vector>
 
 struct MuxMetrics {
@@ -39,13 +41,24 @@ public:
                CODEC_H264,
                CODEC_NV12,  // Uncompressed 4:2:0.
        };
+       enum WriteStrategy {
+               // add_packet() will write the packet immediately, unless plugged.
+               WRITE_FOREGROUND,
+
+               // All writes will happen on a separate thread, so add_packet()
+               // won't block. Use this if writing to a file and you might be
+               // holding a mutex (because blocking I/O with a mutex held is
+               // not good). Note that this will clone every packet, so it has
+               // higher overhead.
+               WRITE_BACKGROUND,
+       };
 
        // Takes ownership of avctx. <write_callback> will be called every time
        // a write has been made to the video stream (id 0), with the pts of
        // the just-written frame. (write_callback can be nullptr.)
        // Does not take ownership of <metrics>; elements in there, if any,
        // will be added to.
-       Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const std::string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, const std::vector<MuxMetrics *> &metrics);
+       Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const std::string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, WriteStrategy write_strategy, const std::vector<MuxMetrics *> &metrics);
        ~Mux();
        void add_packet(const AVPacket &pkt, int64_t pts, int64_t dts);
 
@@ -61,17 +74,36 @@ public:
        void unplug();
 
 private:
-       void write_packet_or_die(const AVPacket &pkt);  // Must be called with <mu> held.
+       // If write_strategy == WRITE_FOREGORUND, Must be called with <mu> held.
+       void write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts);
+       void thread_func();
+
+       WriteStrategy write_strategy;
 
        std::mutex mu;
-       AVFormatContext *avctx;  // Protected by <mu>.
+
+       // These are only in use if write_strategy == WRITE_BACKGROUND.
+       std::atomic<bool> writer_thread_should_quit{false};
+       std::thread writer_thread;
+
+       AVFormatContext *avctx;  // Protected by <mu>, iff write_strategy == WRITE_BACKGROUND.
        int plug_count = 0;  // Protected by <mu>.
-       std::vector<AVPacket *> plugged_packets;  // Protected by <mu>.
+
+       // Protected by <mu>. If write_strategy == WRITE_FOREGROUND,
+       // this is only in use when plugging.
+       struct QueuedPacket {
+               AVPacket *pkt;
+               int64_t unscaled_pts;
+       };
+       std::vector<QueuedPacket> packet_queue;
+       std::condition_variable packet_queue_ready;
 
        AVStream *avstream_video, *avstream_audio;
 
        std::function<void(int64_t)> write_callback;
        std::vector<MuxMetrics *> metrics;
+
+       friend struct PacketBefore;
 };
 
 #endif  // !defined(_MUX_H)
index 6cab461a98fe37bbb87a5c0e301829fa31162fb2..88d0fb401d514ff7422e1239a007e7d206eda319 100644 (file)
@@ -1840,6 +1840,7 @@ void QuickSyncEncoderImpl::open_output_file(const std::string &filename)
        AVCodecParametersWithDeleter audio_codecpar = file_audio_encoder->get_codec_parameters();
        file_mux.reset(new Mux(avctx, frame_width, frame_height, Mux::CODEC_H264, video_extradata, audio_codecpar.get(), TIMEBASE,
                std::bind(&DiskSpaceEstimator::report_write, disk_space_estimator, filename, _1),
+               Mux::WRITE_BACKGROUND,
                { &current_file_mux_metrics, &total_mux_metrics }));
        metric_current_file_start_time_seconds = get_timestamp_for_metrics();
 
index ec864d490dcdedfb67d3077ecc354d2856969d3a..61a1305c2dbd1e986a9ab78eb452d90859879925 100644 (file)
@@ -189,7 +189,7 @@ void VideoEncoder::open_output_stream()
 
        int time_base = global_flags.stream_coarse_timebase ? COARSE_TIMEBASE : TIMEBASE;
        stream_mux.reset(new Mux(avctx, width, height, video_codec, video_extradata, stream_audio_encoder->get_codec_parameters().get(), time_base,
-               /*write_callback=*/nullptr, { &stream_mux_metrics }));
+               /*write_callback=*/nullptr, Mux::WRITE_FOREGROUND, { &stream_mux_metrics }));
        stream_mux_metrics.init({{ "destination", "http" }});
 }