#include <assert.h>
+#include <algorithm>
#include <mutex>
#include <string>
#include <vector>
}
{
- lock_guard<mutex> lock(ctx_mu);
- if (av_interleaved_write_frame(avctx, &pkt_copy) < 0) {
+ lock_guard<mutex> lock(mu);
+ if (plug_count > 0) {
+ plugged_packets.push_back(av_packet_clone(&pkt_copy));
+ } else if (av_interleaved_write_frame(avctx, &pkt_copy) < 0) {
fprintf(stderr, "av_interleaved_write_frame() failed\n");
exit(1);
}
av_packet_unref(&pkt_copy);
}
+
+void Mux::plug()
+{
+ lock_guard<mutex> lock(mu);
+ ++plug_count;
+}
+
+void Mux::unplug()
+{
+ lock_guard<mutex> lock(mu);
+ if (--plug_count > 0) {
+ return;
+ }
+ assert(plug_count >= 0);
+
+ sort(plugged_packets.begin(), plugged_packets.end(), [](const AVPacket *a, const AVPacket *b) {
+ int64_t a_dts = (a->dts == AV_NOPTS_VALUE ? a->pts : a->dts);
+ int64_t b_dts = (b->dts == AV_NOPTS_VALUE ? b->pts : b->dts);
+ if (a_dts != b_dts) {
+ return a_dts < b_dts;
+ } else {
+ return a->pts < b->pts;
+ }
+ });
+
+ for (AVPacket *pkt : plugged_packets) {
+ if (av_interleaved_write_frame(avctx, pkt) < 0) {
+ fprintf(stderr, "av_interleaved_write_frame() failed\n");
+ exit(1);
+ }
+ av_packet_free(&pkt);
+ }
+ plugged_packets.clear();
+}
}
#include <mutex>
+#include <vector>
class KeyFrameSignalReceiver {
public:
~Mux();
void add_packet(const AVPacket &pkt, int64_t pts, int64_t dts);
+ // As long as the mux is plugged, it will not actually write anything to disk,
+ // just queue the packets. Once it is unplugged, the packets are reordered by pts
+ // and written. This is primarily useful if you might have two different encoders
+ // writing to the mux at the same time (because one is shutting down), so that
+ // pts might otherwise come out-of-order.
+ //
+ // You can plug and unplug multiple times; only when the plug count reaches zero,
+ // something will actually happen.
+ void plug();
+ void unplug();
+
private:
- std::mutex ctx_mu;
- AVFormatContext *avctx; // Protected by <ctx_mu>.
+ std::mutex mu;
+ AVFormatContext *avctx; // Protected by <mu>.
+ int plug_count = 0; // Protected by <mu>.
+ std::vector<AVPacket *> plugged_packets; // Protected by <mu>.
+
AVStream *avstream_video, *avstream_audio;
KeyFrameSignalReceiver *keyframe_signal_receiver;
};
bool begin_frame(GLuint *y_tex, GLuint *cbcr_tex);
RefCountedGLsync end_frame(int64_t pts, int64_t duration, const vector<RefCountedFrame> &input_frames);
void shutdown();
+ void release_gl_resources();
void set_stream_mux(Mux *mux)
{
stream_mux = mux;
VADisplay va_open_display(const string &va_display);
void va_close_display(VADisplay va_dpy);
int setup_encode();
- int release_encode();
+ void release_encode();
void update_ReferenceFrames(int frame_type);
int update_RefPicList(int frame_type);
bool is_shutdown = false;
+ bool has_released_gl_resources = false;
bool use_zerocopy;
int drm_fd = -1;
}
}
-int QuickSyncEncoderImpl::release_encode()
+void QuickSyncEncoderImpl::release_encode()
{
for (unsigned i = 0; i < SURFACE_NUM; i++) {
vaDestroyBuffer(va_dpy, gl_surfaces[i].coded_buf);
vaDestroySurfaces(va_dpy, &gl_surfaces[i].src_surface, 1);
vaDestroySurfaces(va_dpy, &gl_surfaces[i].ref_surface, 1);
+ }
+ vaDestroyContext(va_dpy, context_id);
+ vaDestroyConfig(va_dpy, config_id);
+}
+
+void QuickSyncEncoderImpl::release_gl_resources()
+{
+ assert(is_shutdown);
+ if (has_released_gl_resources) {
+ return;
+ }
+
+ for (unsigned i = 0; i < SURFACE_NUM; i++) {
if (!use_zerocopy) {
glBindBuffer(GL_PIXEL_PACK_BUFFER, gl_surfaces[i].pbo);
glUnmapBuffer(GL_PIXEL_PACK_BUFFER);
resource_pool->release_2d_texture(gl_surfaces[i].cbcr_tex);
}
- vaDestroyContext(va_dpy, context_id);
- vaDestroyConfig(va_dpy, config_id);
-
- return 0;
+ has_released_gl_resources = true;
}
int QuickSyncEncoderImpl::deinit_va()
QuickSyncEncoderImpl::~QuickSyncEncoderImpl()
{
shutdown();
+ release_gl_resources();
}
bool QuickSyncEncoderImpl::begin_frame(GLuint *y_tex, GLuint *cbcr_tex)
void add_audio(int64_t pts, std::vector<float> audio);
bool begin_frame(GLuint *y_tex, GLuint *cbcr_tex);
RefCountedGLsync end_frame(int64_t pts, int64_t duration, const std::vector<RefCountedFrame> &input_frames);
- void shutdown(); // Blocking.
+ void shutdown(); // Blocking. Does not require an OpenGL context.
+ void release_gl_resources(); // Requires an OpenGL context. Must be run after shutdown.
private:
std::unique_ptr<QuickSyncEncoderImpl> impl;
VideoEncoder::~VideoEncoder()
{
quicksync_encoder.reset(nullptr);
+ while (quicksync_encoders_in_shutdown.load() > 0) {
+ usleep(10000);
+ }
close_output_stream();
}
{
string filename = generate_local_dump_filename(frame);
printf("Starting new recording: %s\n", filename.c_str());
- quicksync_encoder->shutdown();
+
+ // Do the shutdown of the old encoder in a separate thread, since it can
+ // take some time (it needs to wait for all the frames in the queue to be
+ // done encoding, for one) and we are running on the main mixer thread.
+ // However, since this means both encoders could be sending packets at
+ // the same time, it means pts could come out of order to the stream mux,
+ // and we need to plug it until the shutdown is complete.
+ stream_mux->plug();
+ lock_guard<mutex> lock(qs_mu);
+ QuickSyncEncoder *old_encoder = quicksync_encoder.release(); // When we go C++14, we can use move capture instead.
+ thread([old_encoder, this]{
+ old_encoder->shutdown();
+ stream_mux->unplug();
+
+ // We cannot delete the encoder here, as this thread has no OpenGL context.
+ // We'll deal with it in begin_frame().
+ lock_guard<mutex> lock(qs_mu);
+ qs_needing_cleanup.emplace_back(old_encoder);
+ }).detach();
+
quicksync_encoder.reset(new QuickSyncEncoder(filename, resource_pool, surface, va_display, width, height, oformat, stream_audio_encoder.get(), x264_encoder.get()));
quicksync_encoder->set_stream_mux(stream_mux.get());
}
void VideoEncoder::add_audio(int64_t pts, std::vector<float> audio)
{
+ lock_guard<mutex> lock(qs_mu);
quicksync_encoder->add_audio(pts, audio);
}
bool VideoEncoder::begin_frame(GLuint *y_tex, GLuint *cbcr_tex)
{
+ lock_guard<mutex> lock(qs_mu);
+ qs_needing_cleanup.clear(); // Since we have an OpenGL context here, and are called regularly.
return quicksync_encoder->begin_frame(y_tex, cbcr_tex);
}
RefCountedGLsync VideoEncoder::end_frame(int64_t pts, int64_t duration, const std::vector<RefCountedFrame> &input_frames)
{
+ lock_guard<mutex> lock(qs_mu);
return quicksync_encoder->end_frame(pts, duration, input_frames);
}
#include <stdint.h>
#include <memory>
+#include <set>
#include <string>
#include <vector>
int write_packet(uint8_t *buf, int buf_size);
AVOutputFormat *oformat;
- std::unique_ptr<QuickSyncEncoder> quicksync_encoder;
+ std::mutex qs_mu;
+ std::unique_ptr<QuickSyncEncoder> quicksync_encoder; // Under <qs_mu>.
movit::ResourcePool *resource_pool;
QSurface *surface;
std::string va_display;
std::string stream_mux_header;
bool stream_mux_writing_keyframes = false;
+
+ std::atomic<int> quicksync_encoders_in_shutdown{0};
+
+ // Encoders that are shutdown, but need to call release_gl_resources()
+ // (or be deleted) from some thread with an OpenGL context.
+ std::vector<std::unique_ptr<QuickSyncEncoder>> qs_needing_cleanup; // Under <qs_mu>.
};
#endif