MJPEGEncoder::MJPEGEncoder(HTTPD *httpd, const string &va_display)
: httpd(httpd)
{
- encoder_thread = thread(&MJPEGEncoder::encoder_thread_func, this);
-
// Set up the mux. We don't use the Mux wrapper, because it's geared towards
// a situation with only one video stream (and possibly one audio stream)
// with known width/height, and we don't need the extra functionality it provides.
fprintf(stderr, "Could not initialize VA-API for MJPEG encoding: %s. JPEGs will be encoded in software if needed.\n", error.c_str());
}
+ encoder_thread = thread(&MJPEGEncoder::encoder_thread_func, this);
+ if (va_dpy != nullptr) {
+ va_receiver_thread = thread(&MJPEGEncoder::va_receiver_thread_func, this);
+ }
+
running = true;
}
should_quit = true;
any_frames_to_be_encoded.notify_all();
encoder_thread.join();
+ if (va_dpy != nullptr) {
+ va_receiver_thread.join();
+ }
}
unique_ptr<VADisplayWithCleanup> MJPEGEncoder::try_open_va(const string &va_display, string *error, VAConfigID *config_id)
}
lock_guard<mutex> lock(mu);
+ if (frames_to_be_encoded.size() + frames_encoding.size() > 10) {
+ fprintf(stderr, "WARNING: MJPEG encoding doesn't keep up, discarding frame.\n");
+ return;
+ }
frames_to_be_encoded.push(QueuedFrame{ pts, card_index, frame, video_format, y_offset, cbcr_offset });
any_frames_to_be_encoded.notify_all();
}
posix_memalign((void **)&tmp_cb, 4096, 4096 * 8);
posix_memalign((void **)&tmp_cr, 4096, 4096 * 8);
- unique_lock<mutex> lock(mu);
for (;;) {
- any_frames_to_be_encoded.wait(lock, [this] { return !frames_to_be_encoded.empty() || should_quit; });
- if (should_quit) break;
- QueuedFrame qf = move(frames_to_be_encoded.front());
- frames_to_be_encoded.pop();
-
- vector<uint8_t> jpeg = encode_jpeg(qf);
-
- AVPacket pkt;
- memset(&pkt, 0, sizeof(pkt));
- pkt.buf = nullptr;
- pkt.data = &jpeg[0];
- pkt.size = jpeg.size();
- pkt.stream_index = qf.card_index;
- pkt.flags = AV_PKT_FLAG_KEY;
- pkt.pts = pkt.dts = qf.pts;
-
- if (av_write_frame(avctx.get(), &pkt) < 0) {
- fprintf(stderr, "av_write_frame() failed\n");
- exit(1);
+ QueuedFrame qf;
+ {
+ unique_lock<mutex> lock(mu);
+ any_frames_to_be_encoded.wait(lock, [this] { return !frames_to_be_encoded.empty() || should_quit; });
+ if (should_quit) break;
+ qf = move(frames_to_be_encoded.front());
+ frames_to_be_encoded.pop();
+ }
+
+ if (va_dpy != nullptr) {
+ // Will call back in the receiver thread.
+ encode_jpeg_va(move(qf));
+ } else {
+ // Encode synchronously, in the same thread.
+ vector<uint8_t> jpeg = encode_jpeg_libjpeg(qf);
+ write_mjpeg_packet(qf.pts, qf.card_index, jpeg);
}
}
free(tmp_cr);
}
+void MJPEGEncoder::write_mjpeg_packet(int64_t pts, unsigned card_index, const vector<uint8_t> &jpeg)
+{
+ AVPacket pkt;
+ memset(&pkt, 0, sizeof(pkt));
+ pkt.buf = nullptr;
+ pkt.data = const_cast<uint8_t *>(&jpeg[0]);
+ pkt.size = jpeg.size();
+ pkt.stream_index = card_index;
+ pkt.flags = AV_PKT_FLAG_KEY;
+ pkt.pts = pkt.dts = pts;
+
+ if (av_write_frame(avctx.get(), &pkt) < 0) {
+ fprintf(stderr, "av_write_frame() failed\n");
+ exit(1);
+ }
+}
+
class VABufferDestroyer {
public:
VABufferDestroyer(VADisplay dpy, VABufferID buf)
return ret;
}
-vector<uint8_t> MJPEGEncoder::encode_jpeg(const QueuedFrame &qf)
-{
- if (va_dpy != nullptr) {
- return encode_jpeg_va(qf);
- } else {
- return encode_jpeg_libjpeg(qf);
- }
-}
-
-vector<uint8_t> MJPEGEncoder::encode_jpeg_va(const QueuedFrame &qf)
+void MJPEGEncoder::encode_jpeg_va(QueuedFrame &&qf)
{
unsigned width = qf.video_format.width;
unsigned height = qf.video_format.height;
va_status = vaEndPicture(va_dpy->va_dpy, resources.context);
CHECK_VASTATUS(va_status, "vaEndPicture");
- va_status = vaSyncSurface(va_dpy->va_dpy, resources.surface);
- CHECK_VASTATUS(va_status, "vaSyncSurface");
+ qf.resources = move(resources);
+ qf.resource_releaser = move(release);
- VACodedBufferSegment *segment;
- va_status = vaMapBuffer(va_dpy->va_dpy, resources.data_buffer, (void **)&segment);
- CHECK_VASTATUS(va_status, "vaMapBuffer");
+ lock_guard<mutex> lock(mu);
+ frames_encoding.push(move(qf));
+ any_frames_encoding.notify_all();
+}
- const char *coded_buf = reinterpret_cast<char *>(segment->buf);
- vector<uint8_t> jpeg(coded_buf, coded_buf + segment->size);
+void MJPEGEncoder::va_receiver_thread_func()
+{
+ pthread_setname_np(pthread_self(), "MJPEG_Receive");
+ for (;;) {
+ QueuedFrame qf;
+ {
+ unique_lock<mutex> lock(mu);
+ any_frames_encoding.wait(lock, [this] { return !frames_encoding.empty() || should_quit; });
+ if (should_quit) return;
+ qf = move(frames_encoding.front());
+ frames_encoding.pop();
+ }
- va_status = vaUnmapBuffer(va_dpy->va_dpy, resources.data_buffer);
- CHECK_VASTATUS(va_status, "vaUnmapBuffer");
+ VAStatus va_status = vaSyncSurface(va_dpy->va_dpy, qf.resources.surface);
+ CHECK_VASTATUS(va_status, "vaSyncSurface");
- return jpeg;
+ VACodedBufferSegment *segment;
+ va_status = vaMapBuffer(va_dpy->va_dpy, qf.resources.data_buffer, (void **)&segment);
+ CHECK_VASTATUS(va_status, "vaMapBuffer");
+
+ const char *coded_buf = reinterpret_cast<char *>(segment->buf);
+ vector<uint8_t> jpeg(coded_buf, coded_buf + segment->size);
+
+ va_status = vaUnmapBuffer(va_dpy->va_dpy, qf.resources.data_buffer);
+ CHECK_VASTATUS(va_status, "vaUnmapBuffer");
+
+ write_mjpeg_packet(qf.pts, qf.card_index, jpeg);
+ }
}
vector<uint8_t> MJPEGEncoder::encode_jpeg_libjpeg(const QueuedFrame &qf)
private:
static constexpr int quality = 90;
+ struct VAResources {
+ unsigned width, height;
+ VASurfaceID surface;
+ VAContextID context;
+ VABufferID data_buffer;
+ };
+
+ // RAII wrapper to release VAResources on return (even on error).
+ class ReleaseVAResources {
+ public:
+ ReleaseVAResources() : committed(true) {}
+
+ ReleaseVAResources(MJPEGEncoder *mjpeg, const VAResources &resources)
+ : mjpeg(mjpeg), resources(resources) {}
+
+ ReleaseVAResources(ReleaseVAResources &) = delete;
+
+ ReleaseVAResources(ReleaseVAResources &&other)
+ : mjpeg(other.mjpeg), resources(other.resources), committed(other.committed) {
+ other.commit();
+ }
+
+ ReleaseVAResources &operator= (ReleaseVAResources &) = delete;
+
+ ReleaseVAResources &operator= (ReleaseVAResources &&other) {
+ if (!committed) {
+ mjpeg->release_va_resources(resources);
+ }
+ mjpeg = other.mjpeg;
+ resources = std::move(other.resources);
+ committed = other.committed;
+ other.commit();
+ return *this;
+ }
+
+ ~ReleaseVAResources()
+ {
+ if (!committed) {
+ mjpeg->release_va_resources(resources);
+ }
+ }
+
+ void commit() { committed = true; }
+
+ private:
+ MJPEGEncoder *mjpeg = nullptr;
+ VAResources resources;
+ bool committed = false;
+ };
+
struct QueuedFrame {
int64_t pts;
unsigned card_index;
RefCountedFrame frame;
bmusb::VideoFormat video_format;
size_t y_offset, cbcr_offset;
+
+ // Only for frames in the process of being encoded by VA-API.
+ VAResources resources;
+ ReleaseVAResources resource_releaser;
};
void encoder_thread_func();
- std::vector<uint8_t> encode_jpeg(const QueuedFrame &qf);
- std::vector<uint8_t> encode_jpeg_va(const QueuedFrame &qf);
+ void va_receiver_thread_func();
+ void encode_jpeg_va(QueuedFrame &&qf);
std::vector<uint8_t> encode_jpeg_libjpeg(const QueuedFrame &qf);
+ void write_mjpeg_packet(int64_t pts, unsigned card_index, const std::vector<uint8_t> &jpeg);
void init_jpeg_422(unsigned width, unsigned height, VectorDestinationManager *dest, jpeg_compress_struct *cinfo);
std::vector<uint8_t> get_jpeg_header(unsigned width, unsigned height, jpeg_compress_struct *cinfo);
static int write_packet2_thunk(void *opaque, uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time);
int write_packet2(uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time);
- std::thread encoder_thread;
+ std::thread encoder_thread, va_receiver_thread;
std::mutex mu;
std::queue<QueuedFrame> frames_to_be_encoded; // Under mu.
- std::condition_variable any_frames_to_be_encoded;
+ std::condition_variable any_frames_to_be_encoded; // Governs changes in both frames_to_be_encoded and frames_under_encoding
- std::queue<QueuedFrame> frames_encoding; // Under mu.
+ std::queue<QueuedFrame> frames_encoding; // Under mu. Used for VA-API only.
std::condition_variable any_frames_encoding;
AVFormatContextWithCloser avctx;
std::map<std::pair<unsigned, unsigned>, VAData> va_data_for_resolution;
VAData get_va_data_for_resolution(unsigned width, unsigned height);
- struct VAResources {
- unsigned width, height;
- VASurfaceID surface;
- VAContextID context;
- VABufferID data_buffer;
- };
std::list<VAResources> va_resources_freelist;
std::mutex va_resources_mutex;
VAResources get_va_resources(unsigned width, unsigned height);
void release_va_resources(VAResources resources);
- // RAII wrapper to release VAResources on return (even on error).
- class ReleaseVAResources {
- public:
- ReleaseVAResources(MJPEGEncoder *mjpeg, const VAResources &resources)
- : mjpeg(mjpeg), resources(resources) {}
- ~ReleaseVAResources()
- {
- if (!committed) {
- mjpeg->release_va_resources(resources);
- }
- }
-
- void commit() { committed = true; }
-
- private:
- MJPEGEncoder * const mjpeg;
- const VAResources &resources;
- bool committed = false;
- };
-
static std::unique_ptr<VADisplayWithCleanup> try_open_va(const std::string &va_display, std::string *error, VAConfigID *config_id);
- uint8_t *tmp_y, *tmp_cbcr, *tmp_cb, *tmp_cr; // Private to the encoder thread.
+ uint8_t *tmp_y, *tmp_cbcr, *tmp_cb, *tmp_cr; // Private to the encoder thread. Used by the libjpeg backend only.
};
#endif // !defined(_MJPEG_ENCODER_H)