CXX=g++
-PKG_MODULES := Qt5Core Qt5Gui Qt5Widgets Qt5OpenGLExtensions Qt5OpenGL Qt5PrintSupport libjpeg movit
+PKG_MODULES := Qt5Core Qt5Gui Qt5Widgets Qt5OpenGLExtensions Qt5OpenGL Qt5PrintSupport libjpeg movit libmicrohttpd
CXXFLAGS ?= -O2 -g -Wall # Will be overridden by environment.
CXXFLAGS += -fPIC $(shell pkg-config --cflags $(PKG_MODULES)) -DMOVIT_SHADER_DIR=\"$(shell pkg-config --variable=shaderdir movit)\" -pthread
OBJS += $(OBJS_WITH_MOC)
OBJS += $(OBJS_WITH_MOC:.o=.moc.o)
-OBJS += ffmpeg_raii.o main.o player.o
+OBJS += ffmpeg_raii.o main.o player.o httpd.o mux.o metacube2.o
%.o: %.cpp
$(CXX) -MMD -MP $(CPPFLAGS) $(CXXFLAGS) -o $@ -c $<
#define MAX_STREAMS 16
#define CACHE_SIZE 1000 // In number of frames.
#define NUM_CAMERAS 4
+#define MUX_BUFFER_SIZE 10485760
+
+#define DEFAULT_STREAM_MUX_NAME "nut" // Only for HTTP. Local dump guesses from LOCAL_DUMP_SUFFIX.
+#define DEFAULT_HTTPD_PORT 9095
+#define MUX_OPTS { \
+ /* Make seekable .mov files, and keep MP4 muxer from using unlimited amounts of memory. */ \
+ { "movflags", "empty_moov+frag_keyframe+default_base_moof+skip_trailer" }, \
+ \
+ /* Make for somewhat less bursty stream output when using .mov. */ \
+ { "frag_duration", "125000" }, \
+ \
+ /* Keep nut muxer from using unlimited amounts of memory. */ \
+ { "write_index", "0" } \
+}
+
#endif // !defined(_DEFS_H)
--- /dev/null
+#include "httpd.h"
+
+#include <assert.h>
+#include <byteswap.h>
+#include <endian.h>
+#include <microhttpd.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+#include <memory>
+extern "C" {
+#include <libavutil/avutil.h>
+}
+
+#include "defs.h"
+#include "metacube2.h"
+
+struct MHD_Connection;
+struct MHD_Response;
+
+using namespace std;
+
+HTTPD::HTTPD()
+{
+}
+
+HTTPD::~HTTPD()
+{
+ stop();
+}
+
+void HTTPD::start(int port)
+{
+ mhd = MHD_start_daemon(MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL_INTERNALLY | MHD_USE_DUAL_STACK,
+ port,
+ nullptr, nullptr,
+ &answer_to_connection_thunk, this,
+ MHD_OPTION_NOTIFY_COMPLETED, nullptr, this,
+ MHD_OPTION_END);
+ if (mhd == nullptr) {
+ fprintf(stderr, "Warning: Could not open HTTP server. (Port already in use?)\n");
+ }
+}
+
+void HTTPD::stop()
+{
+ if (mhd) {
+ MHD_quiesce_daemon(mhd);
+ for (Stream *stream : streams) {
+ stream->stop();
+ }
+ MHD_stop_daemon(mhd);
+ mhd = nullptr;
+ }
+}
+
+void HTTPD::add_data(const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase)
+{
+ unique_lock<mutex> lock(streams_mutex);
+ for (Stream *stream : streams) {
+ stream->add_data(buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER, time, timebase);
+ }
+}
+
+int HTTPD::answer_to_connection_thunk(void *cls, MHD_Connection *connection,
+ const char *url, const char *method,
+ const char *version, const char *upload_data,
+ size_t *upload_data_size, void **con_cls)
+{
+ HTTPD *httpd = (HTTPD *)cls;
+ return httpd->answer_to_connection(connection, url, method, version, upload_data, upload_data_size, con_cls);
+}
+
+int HTTPD::answer_to_connection(MHD_Connection *connection,
+ const char *url, const char *method,
+ const char *version, const char *upload_data,
+ size_t *upload_data_size, void **con_cls)
+{
+ // See if the URL ends in “.metacube”.
+ HTTPD::Stream::Framing framing;
+ if (strstr(url, ".metacube") == url + strlen(url) - strlen(".metacube")) {
+ framing = HTTPD::Stream::FRAMING_METACUBE;
+ } else {
+ framing = HTTPD::Stream::FRAMING_RAW;
+ }
+
+ if (endpoints.count(url)) {
+ pair<string, string> contents_and_type = endpoints[url].callback();
+ MHD_Response *response = MHD_create_response_from_buffer(
+ contents_and_type.first.size(), &contents_and_type.first[0], MHD_RESPMEM_MUST_COPY);
+ MHD_add_response_header(response, "Content-type", contents_and_type.second.c_str());
+ if (endpoints[url].cors_policy == ALLOW_ALL_ORIGINS) {
+ MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
+ }
+ int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
+ MHD_destroy_response(response); // Only decreases the refcount; actual free is after the request is done.
+ return ret;
+ }
+
+ // Small hack; reject unknown /channels/foo.
+ if (string(url).find("/channels/") == 0) {
+ string contents = "Not found.";
+ MHD_Response *response = MHD_create_response_from_buffer(
+ contents.size(), &contents[0], MHD_RESPMEM_MUST_COPY);
+ MHD_add_response_header(response, "Content-type", "text/plain");
+ int ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
+ MHD_destroy_response(response); // Only decreases the refcount; actual free is after the request is done.
+ return ret;
+ }
+
+ HTTPD::Stream *stream = new HTTPD::Stream(this, framing);
+ stream->add_data(header.data(), header.size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 });
+ {
+ unique_lock<mutex> lock(streams_mutex);
+ streams.insert(stream);
+ }
+ ++metric_num_connected_clients;
+ *con_cls = stream;
+
+ // Does not strictly have to be equal to MUX_BUFFER_SIZE.
+ MHD_Response *response = MHD_create_response_from_callback(
+ (size_t)-1, MUX_BUFFER_SIZE, &HTTPD::Stream::reader_callback_thunk, stream, &HTTPD::free_stream);
+ // TODO: Content-type?
+ if (framing == HTTPD::Stream::FRAMING_METACUBE) {
+ MHD_add_response_header(response, "Content-encoding", "metacube");
+ }
+
+ int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
+ MHD_destroy_response(response); // Only decreases the refcount; actual free is after the request is done.
+
+ return ret;
+}
+
+void HTTPD::free_stream(void *cls)
+{
+ HTTPD::Stream *stream = (HTTPD::Stream *)cls;
+ HTTPD *httpd = stream->get_parent();
+ {
+ unique_lock<mutex> lock(httpd->streams_mutex);
+ delete stream;
+ httpd->streams.erase(stream);
+ }
+ --httpd->metric_num_connected_clients;
+}
+
+ssize_t HTTPD::Stream::reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max)
+{
+ HTTPD::Stream *stream = (HTTPD::Stream *)cls;
+ return stream->reader_callback(pos, buf, max);
+}
+
+ssize_t HTTPD::Stream::reader_callback(uint64_t pos, char *buf, size_t max)
+{
+ unique_lock<mutex> lock(buffer_mutex);
+ has_buffered_data.wait(lock, [this]{ return should_quit || !buffered_data.empty(); });
+ if (should_quit) {
+ return 0;
+ }
+
+ ssize_t ret = 0;
+ while (max > 0 && !buffered_data.empty()) {
+ const string &s = buffered_data.front();
+ assert(s.size() > used_of_buffered_data);
+ size_t len = s.size() - used_of_buffered_data;
+ if (max >= len) {
+ // Consume the entire (rest of the) string.
+ memcpy(buf, s.data() + used_of_buffered_data, len);
+ buf += len;
+ ret += len;
+ max -= len;
+ buffered_data.pop_front();
+ used_of_buffered_data = 0;
+ } else {
+ // We don't need the entire string; just use the first part of it.
+ memcpy(buf, s.data() + used_of_buffered_data, max);
+ buf += max;
+ used_of_buffered_data += max;
+ ret += max;
+ max = 0;
+ }
+ }
+
+ return ret;
+}
+
+void HTTPD::Stream::add_data(const char *buf, size_t buf_size, HTTPD::Stream::DataType data_type, int64_t time, AVRational timebase)
+{
+ if (buf_size == 0) {
+ return;
+ }
+ if (data_type == DATA_TYPE_KEYFRAME) {
+ seen_keyframe = true;
+ } else if (data_type == DATA_TYPE_OTHER && !seen_keyframe) {
+ // Start sending only once we see a keyframe.
+ return;
+ }
+
+ unique_lock<mutex> lock(buffer_mutex);
+
+ if (framing == FRAMING_METACUBE) {
+ int flags = 0;
+ if (data_type == DATA_TYPE_HEADER) {
+ flags |= METACUBE_FLAGS_HEADER;
+ } else if (data_type == DATA_TYPE_OTHER) {
+ flags |= METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
+ }
+
+ // If we're about to send a keyframe, send a pts metadata block
+ // to mark its time.
+ if ((flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0 && time != AV_NOPTS_VALUE) {
+ metacube2_pts_packet packet;
+ packet.type = htobe64(METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS);
+ packet.pts = htobe64(time);
+ packet.timebase_num = htobe64(timebase.num);
+ packet.timebase_den = htobe64(timebase.den);
+
+ metacube2_block_header hdr;
+ memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(sizeof(packet));
+ hdr.flags = htons(METACUBE_FLAGS_METADATA);
+ hdr.csum = htons(metacube2_compute_crc(&hdr));
+ buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+ buffered_data.emplace_back((char *)&packet, sizeof(packet));
+ }
+
+ metacube2_block_header hdr;
+ memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(buf_size);
+ hdr.flags = htons(flags);
+ hdr.csum = htons(metacube2_compute_crc(&hdr));
+ buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+ }
+ buffered_data.emplace_back(buf, buf_size);
+
+ // Send a Metacube2 timestamp every keyframe.
+ if (framing == FRAMING_METACUBE && data_type == DATA_TYPE_KEYFRAME) {
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ metacube2_timestamp_packet packet;
+ packet.type = htobe64(METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP);
+ packet.tv_sec = htobe64(now.tv_sec);
+ packet.tv_nsec = htobe64(now.tv_nsec);
+
+ metacube2_block_header hdr;
+ memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(sizeof(packet));
+ hdr.flags = htons(METACUBE_FLAGS_METADATA);
+ hdr.csum = htons(metacube2_compute_crc(&hdr));
+ buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+ buffered_data.emplace_back((char *)&packet, sizeof(packet));
+ }
+
+ has_buffered_data.notify_all();
+}
+
+void HTTPD::Stream::stop()
+{
+ unique_lock<mutex> lock(buffer_mutex);
+ should_quit = true;
+ has_buffered_data.notify_all();
+}
--- /dev/null
+#ifndef _HTTPD_H
+#define _HTTPD_H
+
+// A class dealing with stream output to HTTP.
+
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <functional>
+#include <mutex>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+extern "C" {
+#include <libavutil/rational.h>
+}
+
+struct MHD_Connection;
+struct MHD_Daemon;
+
+class HTTPD {
+public:
+ // Returns a pair of content and content-type.
+ using EndpointCallback = std::function<std::pair<std::string, std::string>()>;
+
+ HTTPD();
+ ~HTTPD();
+
+ // Should be called before start().
+ void set_header(const std::string &data) {
+ header = data;
+ }
+
+ // Should be called before start() (due to threading issues).
+ enum CORSPolicy {
+ NO_CORS_POLICY,
+ ALLOW_ALL_ORIGINS
+ };
+ void add_endpoint(const std::string &url, const EndpointCallback &callback, CORSPolicy cors_policy) {
+ endpoints[url] = Endpoint{ callback, cors_policy };
+ }
+
+ void start(int port);
+ void stop();
+ void add_data(const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase);
+ int64_t get_num_connected_clients() const {
+ return metric_num_connected_clients.load();
+ }
+
+private:
+ static int answer_to_connection_thunk(void *cls, MHD_Connection *connection,
+ const char *url, const char *method,
+ const char *version, const char *upload_data,
+ size_t *upload_data_size, void **con_cls);
+
+ int answer_to_connection(MHD_Connection *connection,
+ const char *url, const char *method,
+ const char *version, const char *upload_data,
+ size_t *upload_data_size, void **con_cls);
+
+ static void free_stream(void *cls);
+
+
+ class Stream {
+ public:
+ enum Framing {
+ FRAMING_RAW,
+ FRAMING_METACUBE
+ };
+ Stream(HTTPD *parent, Framing framing) : parent(parent), framing(framing) {}
+
+ static ssize_t reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max);
+ ssize_t reader_callback(uint64_t pos, char *buf, size_t max);
+
+ enum DataType {
+ DATA_TYPE_HEADER,
+ DATA_TYPE_KEYFRAME,
+ DATA_TYPE_OTHER
+ };
+ void add_data(const char *buf, size_t size, DataType data_type, int64_t time, AVRational timebase);
+ void stop();
+ HTTPD *get_parent() const { return parent; }
+
+ private:
+ HTTPD *parent;
+ Framing framing;
+
+ std::mutex buffer_mutex;
+ bool should_quit = false; // Under <buffer_mutex>.
+ std::condition_variable has_buffered_data;
+ std::deque<std::string> buffered_data; // Protected by <buffer_mutex>.
+ size_t used_of_buffered_data = 0; // How many bytes of the first element of <buffered_data> that is already used. Protected by <mutex>.
+ size_t seen_keyframe = false;
+ };
+
+ MHD_Daemon *mhd = nullptr;
+ std::mutex streams_mutex;
+ std::set<Stream *> streams; // Not owned.
+ struct Endpoint {
+ EndpointCallback callback;
+ CORSPolicy cors_policy;
+ };
+ std::unordered_map<std::string, Endpoint> endpoints;
+ std::string header;
+
+ // Metrics.
+ std::atomic<int64_t> metric_num_connected_clients{0};
+};
+
+#endif // !defined(_HTTPD_H)
#include "defs.h"
#include "mainwindow.h"
#include "ffmpeg_raii.h"
+#include "httpd.h"
#include "player.h"
#include "post_to_main_thread.h"
#include "ui_mainwindow.h"
mutex frame_mu;
vector<int64_t> frames[MAX_STREAMS];
QGLWidget *global_share_widget;
+HTTPD *global_httpd;
int record_thread_func();
int main(int argc, char **argv)
{
avformat_network_init();
+ global_httpd = new HTTPD;
+ global_httpd->start(DEFAULT_HTTPD_PORT);
QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
--- /dev/null
+/*
+ * Implementation of Metacube2 utility functions.
+ *
+ * Note: This file is meant to compile as both C and C++, for easier inclusion
+ * in other projects.
+ */
+
+#include "metacube2.h"
+
+#include <byteswap.h>
+#include <netinet/in.h>
+
+/*
+ * https://www.ece.cmu.edu/~koopman/pubs/KoopmanCRCWebinar9May2012.pdf
+ * recommends this for messages as short as ours (see table at page 34).
+ */
+#define METACUBE2_CRC_POLYNOMIAL 0x8FDB
+
+/* Semi-random starting value to make sure all-zero won't pass. */
+#define METACUBE2_CRC_START 0x1234
+
+/* This code is based on code generated by pycrc. */
+uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr)
+{
+ static const int data_len = sizeof(hdr->size) + sizeof(hdr->flags);
+ const uint8_t *data = (uint8_t *)&hdr->size;
+ uint16_t crc = METACUBE2_CRC_START;
+ int i, j;
+
+ for (i = 0; i < data_len; ++i) {
+ uint8_t c = data[i];
+ for (j = 0; j < 8; j++) {
+ int bit = crc & 0x8000;
+ crc = (crc << 1) | ((c >> (7 - j)) & 0x01);
+ if (bit) {
+ crc ^= METACUBE2_CRC_POLYNOMIAL;
+ }
+ }
+ }
+
+ /* Finalize. */
+ for (i = 0; i < 16; i++) {
+ int bit = crc & 0x8000;
+ crc = crc << 1;
+ if (bit) {
+ crc ^= METACUBE2_CRC_POLYNOMIAL;
+ }
+ }
+
+ /*
+ * Invert the checksum for metadata packets, so that clients that
+ * don't understand metadata will ignore it as broken. There will
+ * probably be logging, but apart from that, it's harmless.
+ */
+ if (ntohs(hdr->flags) & METACUBE_FLAGS_METADATA) {
+ crc ^= 0xffff;
+ }
+
+ return crc;
+}
--- /dev/null
+#ifndef _METACUBE2_H
+#define _METACUBE2_H
+
+/*
+ * Definitions for the Metacube2 protocol, used to communicate with Cubemap.
+ *
+ * Note: This file is meant to compile as both C and C++, for easier inclusion
+ * in other projects.
+ */
+
+#include <stdint.h>
+
+#define METACUBE2_SYNC "cube!map" /* 8 bytes long. */
+#define METACUBE_FLAGS_HEADER 0x1
+#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2
+
+/*
+ * Metadata packets; should not be counted as data, but rather
+ * parsed (or ignored if you don't understand them).
+ *
+ * Metadata packets start with a uint64_t (network byte order)
+ * that describe the type; the rest is defined by the type.
+ */
+#define METACUBE_FLAGS_METADATA 0x4
+
+struct metacube2_block_header {
+ char sync[8]; /* METACUBE2_SYNC */
+ uint32_t size; /* Network byte order. Does not include header. */
+ uint16_t flags; /* Network byte order. METACUBE_FLAGS_*. */
+ uint16_t csum; /* Network byte order. CRC16 of size and flags.
+ If METACUBE_FLAGS_METADATA is set, inverted
+ so that older clients will ignore it as broken. */
+};
+
+uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr);
+
+/*
+ * Set by the encoder, and can be measured for latency purposes (e.g., if the
+ * network can't keep up, the latency will tend to increase.
+ */
+#define METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP 0x1
+
+struct metacube2_timestamp_packet {
+ uint64_t type; /* METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP, in network byte order. */
+
+ /*
+ * Time since the UTC epoch. Basically a struct timespec.
+ * Both are in network byte order.
+ */
+ uint64_t tv_sec;
+ uint64_t tv_nsec;
+};
+
+/*
+ * Sent before a block to mark its presentation timestamp (ie., counts
+ * only for the next Metacube block). Used so that the reflector can know
+ * the length (in seconds) of fragments.
+ */
+#define METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS 0x2
+
+struct metacube2_pts_packet {
+ uint64_t type; /* METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS, in network byte order. */
+
+ /* The timestamp of the first packet in the next block, in network byte order. */
+ int64_t pts;
+
+ /* Timebase "pts" is expressed in, as a fraction. Network byte order. */
+ uint64_t timebase_num, timebase_den;
+};
+
+#endif /* !defined(_METACUBE_H) */
--- /dev/null
+#include "mux.h"
+
+#include <assert.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <algorithm>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+extern "C" {
+#include <libavformat/avio.h>
+#include <libavutil/avutil.h>
+#include <libavutil/dict.h>
+#include <libavutil/mathematics.h>
+#include <libavutil/mem.h>
+#include <libavutil/pixfmt.h>
+#include <libavutil/rational.h>
+}
+
+#include "defs.h"
+#include "timebase.h"
+
+using namespace std;
+
+struct PacketBefore {
+ PacketBefore(const AVFormatContext *ctx) : ctx(ctx) {}
+
+ bool operator() (const Mux::QueuedPacket &a_qp, const Mux::QueuedPacket &b_qp) const {
+ const AVPacket *a = a_qp.pkt;
+ const AVPacket *b = b_qp.pkt;
+ int64_t a_dts = (a->dts == AV_NOPTS_VALUE ? a->pts : a->dts);
+ int64_t b_dts = (b->dts == AV_NOPTS_VALUE ? b->pts : b->dts);
+ AVRational a_timebase = ctx->streams[a->stream_index]->time_base;
+ AVRational b_timebase = ctx->streams[b->stream_index]->time_base;
+ if (av_compare_ts(a_dts, a_timebase, b_dts, b_timebase) != 0) {
+ return av_compare_ts(a_dts, a_timebase, b_dts, b_timebase) < 0;
+ } else {
+ return av_compare_ts(a->pts, a_timebase, b->pts, b_timebase) < 0;
+ }
+ }
+
+ const AVFormatContext * const ctx;
+};
+
+Mux::Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, WriteStrategy write_strategy, const vector<MuxMetrics *> &metrics)
+ : write_strategy(write_strategy), avctx(avctx), write_callback(write_callback), metrics(metrics)
+{
+ avstream_video = avformat_new_stream(avctx, nullptr);
+ if (avstream_video == nullptr) {
+ fprintf(stderr, "avformat_new_stream() failed\n");
+ exit(1);
+ }
+ avstream_video->time_base = AVRational{1, time_base};
+ avstream_video->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
+ if (video_codec == CODEC_H264) {
+ avstream_video->codecpar->codec_id = AV_CODEC_ID_H264;
+ } else {
+ assert(video_codec == CODEC_NV12);
+ avstream_video->codecpar->codec_id = AV_CODEC_ID_RAWVIDEO;
+ avstream_video->codecpar->codec_tag = avcodec_pix_fmt_to_codec_tag(AV_PIX_FMT_NV12);
+ }
+ avstream_video->codecpar->width = width;
+ avstream_video->codecpar->height = height;
+
+ // Colorspace details. Closely correspond to settings in EffectChain_finalize,
+ // as noted in each comment.
+ // Note that the H.264 stream also contains this information and depending on the
+ // mux, this might simply get ignored. See sps_rbsp().
+ // Note that there's no way to change this per-frame as the H.264 stream
+ // would like to be able to.
+ avstream_video->codecpar->color_primaries = AVCOL_PRI_BT709; // RGB colorspace (inout_format.color_space).
+ avstream_video->codecpar->color_trc = AVCOL_TRC_IEC61966_2_1; // Gamma curve (inout_format.gamma_curve).
+ // YUV colorspace (output_ycbcr_format.luma_coefficients).
+ avstream_video->codecpar->color_space = AVCOL_SPC_BT709;
+ avstream_video->codecpar->color_range = AVCOL_RANGE_MPEG; // Full vs. limited range (output_ycbcr_format.full_range).
+ avstream_video->codecpar->chroma_location = AVCHROMA_LOC_LEFT; // Chroma sample location. See chroma_offset_0[] in Mixer::subsample_chroma().
+ avstream_video->codecpar->field_order = AV_FIELD_PROGRESSIVE;
+
+ if (!video_extradata.empty()) {
+ avstream_video->codecpar->extradata = (uint8_t *)av_malloc(video_extradata.size());
+ avstream_video->codecpar->extradata_size = video_extradata.size();
+ memcpy(avstream_video->codecpar->extradata, video_extradata.data(), video_extradata.size());
+ }
+
+ avstream_audio = avformat_new_stream(avctx, nullptr);
+ if (avstream_audio == nullptr) {
+ fprintf(stderr, "avformat_new_stream() failed\n");
+ exit(1);
+ }
+ avstream_audio->time_base = AVRational{1, time_base};
+ if (avcodec_parameters_copy(avstream_audio->codecpar, audio_codecpar) < 0) {
+ fprintf(stderr, "avcodec_parameters_copy() failed\n");
+ exit(1);
+ }
+
+ AVDictionary *options = NULL;
+ vector<pair<string, string>> opts = MUX_OPTS;
+ for (pair<string, string> opt : opts) {
+ av_dict_set(&options, opt.first.c_str(), opt.second.c_str(), 0);
+ }
+ if (avformat_write_header(avctx, &options) < 0) {
+ fprintf(stderr, "avformat_write_header() failed\n");
+ exit(1);
+ }
+ for (MuxMetrics *metric : metrics) {
+ metric->metric_written_bytes += avctx->pb->pos;
+ }
+
+ // Make sure the header is written before the constructor exits.
+ avio_flush(avctx->pb);
+
+ if (write_strategy == WRITE_BACKGROUND) {
+ writer_thread = thread(&Mux::thread_func, this);
+ }
+}
+
+Mux::~Mux()
+{
+ assert(plug_count == 0);
+ if (write_strategy == WRITE_BACKGROUND) {
+ writer_thread_should_quit = true;
+ packet_queue_ready.notify_all();
+ writer_thread.join();
+ }
+ int64_t old_pos = avctx->pb->pos;
+ av_write_trailer(avctx);
+ for (MuxMetrics *metric : metrics) {
+ metric->metric_written_bytes += avctx->pb->pos - old_pos;
+ }
+
+ if (!(avctx->oformat->flags & AVFMT_NOFILE) &&
+ !(avctx->flags & AVFMT_FLAG_CUSTOM_IO)) {
+ avio_closep(&avctx->pb);
+ }
+ avformat_free_context(avctx);
+}
+
+void Mux::add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational timebase, int stream_index_override)
+{
+ AVPacket pkt_copy;
+ av_init_packet(&pkt_copy);
+ if (av_packet_ref(&pkt_copy, &pkt) < 0) {
+ fprintf(stderr, "av_copy_packet() failed\n");
+ exit(1);
+ }
+ if (stream_index_override != -1) {
+ pkt_copy.stream_index = stream_index_override;
+ }
+ if (pkt_copy.stream_index == 0) {
+ pkt_copy.pts = av_rescale_q(pts, timebase, avstream_video->time_base);
+ pkt_copy.dts = av_rescale_q(dts, timebase, avstream_video->time_base);
+ pkt_copy.duration = av_rescale_q(pkt.duration, timebase, avstream_video->time_base);
+ } else if (pkt_copy.stream_index == 1) {
+ pkt_copy.pts = av_rescale_q(pts, timebase, avstream_audio->time_base);
+ pkt_copy.dts = av_rescale_q(dts, timebase, avstream_audio->time_base);
+ pkt_copy.duration = av_rescale_q(pkt.duration, timebase, avstream_audio->time_base);
+ } else {
+ assert(false);
+ }
+
+ {
+ lock_guard<mutex> lock(mu);
+ if (write_strategy == WriteStrategy::WRITE_BACKGROUND) {
+ packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
+ if (plug_count == 0) packet_queue_ready.notify_all();
+ } else if (plug_count > 0) {
+ packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
+ } else {
+ write_packet_or_die(pkt_copy, pts);
+ }
+ }
+
+ av_packet_unref(&pkt_copy);
+}
+
+void Mux::write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts)
+{
+ for (MuxMetrics *metric : metrics) {
+ if (pkt.stream_index == 0) {
+ metric->metric_video_bytes += pkt.size;
+ } else if (pkt.stream_index == 1) {
+ metric->metric_audio_bytes += pkt.size;
+ } else {
+ assert(false);
+ }
+ }
+ int64_t old_pos = avctx->pb->pos;
+ if (av_interleaved_write_frame(avctx, const_cast<AVPacket *>(&pkt)) < 0) {
+ fprintf(stderr, "av_interleaved_write_frame() failed\n");
+ exit(1);
+ }
+ avio_flush(avctx->pb);
+ for (MuxMetrics *metric : metrics) {
+ metric->metric_written_bytes += avctx->pb->pos - old_pos;
+ }
+
+ if (pkt.stream_index == 0 && write_callback != nullptr) {
+ write_callback(unscaled_pts);
+ }
+}
+
+void Mux::plug()
+{
+ lock_guard<mutex> lock(mu);
+ ++plug_count;
+}
+
+void Mux::unplug()
+{
+ lock_guard<mutex> lock(mu);
+ if (--plug_count > 0) {
+ return;
+ }
+ assert(plug_count >= 0);
+
+ sort(packet_queue.begin(), packet_queue.end(), PacketBefore(avctx));
+
+ if (write_strategy == WRITE_BACKGROUND) {
+ packet_queue_ready.notify_all();
+ } else {
+ for (QueuedPacket &qp : packet_queue) {
+ write_packet_or_die(*qp.pkt, qp.unscaled_pts);
+ av_packet_free(&qp.pkt);
+ }
+ packet_queue.clear();
+ }
+}
+
+void Mux::thread_func()
+{
+ unique_lock<mutex> lock(mu);
+ for ( ;; ) {
+ packet_queue_ready.wait(lock, [this]() {
+ return writer_thread_should_quit || (!packet_queue.empty() && plug_count == 0);
+ });
+ if (writer_thread_should_quit && packet_queue.empty()) {
+ // All done.
+ break;
+ }
+
+ assert(!packet_queue.empty() && plug_count == 0);
+ vector<QueuedPacket> packets;
+ swap(packets, packet_queue);
+
+ lock.unlock();
+ for (QueuedPacket &qp : packets) {
+ write_packet_or_die(*qp.pkt, qp.unscaled_pts);
+ av_packet_free(&qp.pkt);
+ }
+ lock.lock();
+ }
+}
+
+void MuxMetrics::init(const vector<pair<string, string>> &labels)
+{
+ // TODO: See if we want to reintroduce these.
+}
--- /dev/null
+#ifndef _MUX_H
+#define _MUX_H 1
+
+// Wrapper around an AVFormat mux.
+
+extern "C" {
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+}
+
+#include <sys/types.h>
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <thread>
+#include <vector>
+
+#include "timebase.h"
+
+struct MuxMetrics {
+ // “written” will usually be equal video + audio + mux overhead,
+ // except that there could be buffered packets that count in audio or video
+ // but not yet in written.
+ std::atomic<int64_t> metric_video_bytes{0}, metric_audio_bytes{0}, metric_written_bytes{0};
+
+ // Registers in global_metrics.
+ void init(const std::vector<std::pair<std::string, std::string>> &labels);
+
+ void reset()
+ {
+ metric_video_bytes = 0;
+ metric_audio_bytes = 0;
+ metric_written_bytes = 0;
+ }
+};
+
+class Mux {
+public:
+ enum Codec {
+ CODEC_H264,
+ CODEC_NV12, // Uncompressed 4:2:0.
+ };
+ enum WriteStrategy {
+ // add_packet() will write the packet immediately, unless plugged.
+ WRITE_FOREGROUND,
+
+ // All writes will happen on a separate thread, so add_packet()
+ // won't block. Use this if writing to a file and you might be
+ // holding a mutex (because blocking I/O with a mutex held is
+ // not good). Note that this will clone every packet, so it has
+ // higher overhead.
+ WRITE_BACKGROUND,
+ };
+
+ // Takes ownership of avctx. <write_callback> will be called every time
+ // a write has been made to the video stream (id 0), with the pts of
+ // the just-written frame. (write_callback can be nullptr.)
+ // Does not take ownership of <metrics>; elements in there, if any,
+ // will be added to.
+ Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const std::string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, WriteStrategy write_strategy, const std::vector<MuxMetrics *> &metrics);
+ ~Mux();
+ void add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational timebase = { 1, TIMEBASE }, int stream_index_override = -1);
+
+ // As long as the mux is plugged, it will not actually write anything to disk,
+ // just queue the packets. Once it is unplugged, the packets are reordered by pts
+ // and written. This is primarily useful if you might have two different encoders
+ // writing to the mux at the same time (because one is shutting down), so that
+ // pts might otherwise come out-of-order.
+ //
+ // You can plug and unplug multiple times; only when the plug count reaches zero,
+ // something will actually happen.
+ void plug();
+ void unplug();
+
+private:
+ // If write_strategy == WRITE_FOREGORUND, Must be called with <mu> held.
+ void write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts);
+ void thread_func();
+
+ WriteStrategy write_strategy;
+
+ std::mutex mu;
+
+ // These are only in use if write_strategy == WRITE_BACKGROUND.
+ std::atomic<bool> writer_thread_should_quit{false};
+ std::thread writer_thread;
+
+ AVFormatContext *avctx; // Protected by <mu>, iff write_strategy == WRITE_BACKGROUND.
+ int plug_count = 0; // Protected by <mu>.
+
+ // Protected by <mu>. If write_strategy == WRITE_FOREGROUND,
+ // this is only in use when plugging.
+ struct QueuedPacket {
+ AVPacket *pkt;
+ int64_t unscaled_pts;
+ };
+ std::vector<QueuedPacket> packet_queue;
+ std::condition_variable packet_queue_ready;
+
+ AVStream *avstream_video, *avstream_audio;
+
+ std::function<void(int64_t)> write_callback;
+ std::vector<MuxMetrics *> metrics;
+
+ friend struct PacketBefore;
+};
+
+#endif // !defined(_MUX_H)
--- /dev/null
+#ifndef _TIMEBASE_H
+#define _TIMEBASE_H 1
+
+// Common timebase that allows us to represent one frame exactly in all the
+// relevant frame rates:
+//
+// Timebase: 1/120000
+// Frame at 50fps: 2400/120000
+// Frame at 60fps: 2000/120000
+// Frame at 59.94fps: 2002/120000
+// Frame at 23.976fps: 5005/120000
+//
+// If we also wanted to represent one sample at 48000 Hz, we'd need
+// to go to 300000. Also supporting one sample at 44100 Hz would mean
+// going to 44100000; probably a bit excessive.
+#define TIMEBASE 120000
+
+// Some muxes, like MP4 (or at least avformat's implementation of it),
+// are not too fond of values above 2^31. At timebase 120000, that's only
+// about five hours or so, so we define a coarser timebase that doesn't
+// get 59.94 precisely (so there will be a marginal amount of pts jitter),
+// but can do at least 50 and 60 precisely, and months of streaming.
+#define COARSE_TIMEBASE 300
+
+#endif // !defined(_TIMEBASE_H)