From 0d0e637698dc347c2fec4746affbcf02d51a31f8 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 18 Aug 2018 21:48:39 +0200 Subject: [PATCH] Import a bunch of http/mux code from Nageru. --- Makefile | 4 +- defs.h | 15 +++ httpd.cpp | 264 ++++++++++++++++++++++++++++++++++++++++++++++++++ httpd.h | 115 ++++++++++++++++++++++ main.cpp | 4 + metacube2.cpp | 60 ++++++++++++ metacube2.h | 71 ++++++++++++++ mux.cpp | 261 +++++++++++++++++++++++++++++++++++++++++++++++++ mux.h | 111 +++++++++++++++++++++ timebase.h | 25 +++++ 10 files changed, 928 insertions(+), 2 deletions(-) create mode 100644 httpd.cpp create mode 100644 httpd.h create mode 100644 metacube2.cpp create mode 100644 metacube2.h create mode 100644 mux.cpp create mode 100644 mux.h create mode 100644 timebase.h diff --git a/Makefile b/Makefile index 7ae9a5c..305516e 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CXX=g++ -PKG_MODULES := Qt5Core Qt5Gui Qt5Widgets Qt5OpenGLExtensions Qt5OpenGL Qt5PrintSupport libjpeg movit +PKG_MODULES := Qt5Core Qt5Gui Qt5Widgets Qt5OpenGLExtensions Qt5OpenGL Qt5PrintSupport libjpeg movit libmicrohttpd CXXFLAGS ?= -O2 -g -Wall # Will be overridden by environment. CXXFLAGS += -fPIC $(shell pkg-config --cflags $(PKG_MODULES)) -DMOVIT_SHADER_DIR=\"$(shell pkg-config --variable=shaderdir movit)\" -pthread @@ -10,7 +10,7 @@ OBJS_WITH_MOC = mainwindow.o jpeg_frame_view.o clip_list.o OBJS += $(OBJS_WITH_MOC) OBJS += $(OBJS_WITH_MOC:.o=.moc.o) -OBJS += ffmpeg_raii.o main.o player.o +OBJS += ffmpeg_raii.o main.o player.o httpd.o mux.o metacube2.o %.o: %.cpp $(CXX) -MMD -MP $(CPPFLAGS) $(CXXFLAGS) -o $@ -c $< diff --git a/defs.h b/defs.h index 9d11750..78ee07f 100644 --- a/defs.h +++ b/defs.h @@ -4,5 +4,20 @@ #define MAX_STREAMS 16 #define CACHE_SIZE 1000 // In number of frames. #define NUM_CAMERAS 4 +#define MUX_BUFFER_SIZE 10485760 + +#define DEFAULT_STREAM_MUX_NAME "nut" // Only for HTTP. Local dump guesses from LOCAL_DUMP_SUFFIX. +#define DEFAULT_HTTPD_PORT 9095 +#define MUX_OPTS { \ + /* Make seekable .mov files, and keep MP4 muxer from using unlimited amounts of memory. */ \ + { "movflags", "empty_moov+frag_keyframe+default_base_moof+skip_trailer" }, \ + \ + /* Make for somewhat less bursty stream output when using .mov. */ \ + { "frag_duration", "125000" }, \ + \ + /* Keep nut muxer from using unlimited amounts of memory. */ \ + { "write_index", "0" } \ +} + #endif // !defined(_DEFS_H) diff --git a/httpd.cpp b/httpd.cpp new file mode 100644 index 0000000..fddc45a --- /dev/null +++ b/httpd.cpp @@ -0,0 +1,264 @@ +#include "httpd.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +extern "C" { +#include +} + +#include "defs.h" +#include "metacube2.h" + +struct MHD_Connection; +struct MHD_Response; + +using namespace std; + +HTTPD::HTTPD() +{ +} + +HTTPD::~HTTPD() +{ + stop(); +} + +void HTTPD::start(int port) +{ + mhd = MHD_start_daemon(MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL_INTERNALLY | MHD_USE_DUAL_STACK, + port, + nullptr, nullptr, + &answer_to_connection_thunk, this, + MHD_OPTION_NOTIFY_COMPLETED, nullptr, this, + MHD_OPTION_END); + if (mhd == nullptr) { + fprintf(stderr, "Warning: Could not open HTTP server. (Port already in use?)\n"); + } +} + +void HTTPD::stop() +{ + if (mhd) { + MHD_quiesce_daemon(mhd); + for (Stream *stream : streams) { + stream->stop(); + } + MHD_stop_daemon(mhd); + mhd = nullptr; + } +} + +void HTTPD::add_data(const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase) +{ + unique_lock lock(streams_mutex); + for (Stream *stream : streams) { + stream->add_data(buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER, time, timebase); + } +} + +int HTTPD::answer_to_connection_thunk(void *cls, MHD_Connection *connection, + const char *url, const char *method, + const char *version, const char *upload_data, + size_t *upload_data_size, void **con_cls) +{ + HTTPD *httpd = (HTTPD *)cls; + return httpd->answer_to_connection(connection, url, method, version, upload_data, upload_data_size, con_cls); +} + +int HTTPD::answer_to_connection(MHD_Connection *connection, + const char *url, const char *method, + const char *version, const char *upload_data, + size_t *upload_data_size, void **con_cls) +{ + // See if the URL ends in “.metacube”. + HTTPD::Stream::Framing framing; + if (strstr(url, ".metacube") == url + strlen(url) - strlen(".metacube")) { + framing = HTTPD::Stream::FRAMING_METACUBE; + } else { + framing = HTTPD::Stream::FRAMING_RAW; + } + + if (endpoints.count(url)) { + pair contents_and_type = endpoints[url].callback(); + MHD_Response *response = MHD_create_response_from_buffer( + contents_and_type.first.size(), &contents_and_type.first[0], MHD_RESPMEM_MUST_COPY); + MHD_add_response_header(response, "Content-type", contents_and_type.second.c_str()); + if (endpoints[url].cors_policy == ALLOW_ALL_ORIGINS) { + MHD_add_response_header(response, "Access-Control-Allow-Origin", "*"); + } + int ret = MHD_queue_response(connection, MHD_HTTP_OK, response); + MHD_destroy_response(response); // Only decreases the refcount; actual free is after the request is done. + return ret; + } + + // Small hack; reject unknown /channels/foo. + if (string(url).find("/channels/") == 0) { + string contents = "Not found."; + MHD_Response *response = MHD_create_response_from_buffer( + contents.size(), &contents[0], MHD_RESPMEM_MUST_COPY); + MHD_add_response_header(response, "Content-type", "text/plain"); + int ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response); + MHD_destroy_response(response); // Only decreases the refcount; actual free is after the request is done. + return ret; + } + + HTTPD::Stream *stream = new HTTPD::Stream(this, framing); + stream->add_data(header.data(), header.size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 }); + { + unique_lock lock(streams_mutex); + streams.insert(stream); + } + ++metric_num_connected_clients; + *con_cls = stream; + + // Does not strictly have to be equal to MUX_BUFFER_SIZE. + MHD_Response *response = MHD_create_response_from_callback( + (size_t)-1, MUX_BUFFER_SIZE, &HTTPD::Stream::reader_callback_thunk, stream, &HTTPD::free_stream); + // TODO: Content-type? + if (framing == HTTPD::Stream::FRAMING_METACUBE) { + MHD_add_response_header(response, "Content-encoding", "metacube"); + } + + int ret = MHD_queue_response(connection, MHD_HTTP_OK, response); + MHD_destroy_response(response); // Only decreases the refcount; actual free is after the request is done. + + return ret; +} + +void HTTPD::free_stream(void *cls) +{ + HTTPD::Stream *stream = (HTTPD::Stream *)cls; + HTTPD *httpd = stream->get_parent(); + { + unique_lock lock(httpd->streams_mutex); + delete stream; + httpd->streams.erase(stream); + } + --httpd->metric_num_connected_clients; +} + +ssize_t HTTPD::Stream::reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max) +{ + HTTPD::Stream *stream = (HTTPD::Stream *)cls; + return stream->reader_callback(pos, buf, max); +} + +ssize_t HTTPD::Stream::reader_callback(uint64_t pos, char *buf, size_t max) +{ + unique_lock lock(buffer_mutex); + has_buffered_data.wait(lock, [this]{ return should_quit || !buffered_data.empty(); }); + if (should_quit) { + return 0; + } + + ssize_t ret = 0; + while (max > 0 && !buffered_data.empty()) { + const string &s = buffered_data.front(); + assert(s.size() > used_of_buffered_data); + size_t len = s.size() - used_of_buffered_data; + if (max >= len) { + // Consume the entire (rest of the) string. + memcpy(buf, s.data() + used_of_buffered_data, len); + buf += len; + ret += len; + max -= len; + buffered_data.pop_front(); + used_of_buffered_data = 0; + } else { + // We don't need the entire string; just use the first part of it. + memcpy(buf, s.data() + used_of_buffered_data, max); + buf += max; + used_of_buffered_data += max; + ret += max; + max = 0; + } + } + + return ret; +} + +void HTTPD::Stream::add_data(const char *buf, size_t buf_size, HTTPD::Stream::DataType data_type, int64_t time, AVRational timebase) +{ + if (buf_size == 0) { + return; + } + if (data_type == DATA_TYPE_KEYFRAME) { + seen_keyframe = true; + } else if (data_type == DATA_TYPE_OTHER && !seen_keyframe) { + // Start sending only once we see a keyframe. + return; + } + + unique_lock lock(buffer_mutex); + + if (framing == FRAMING_METACUBE) { + int flags = 0; + if (data_type == DATA_TYPE_HEADER) { + flags |= METACUBE_FLAGS_HEADER; + } else if (data_type == DATA_TYPE_OTHER) { + flags |= METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START; + } + + // If we're about to send a keyframe, send a pts metadata block + // to mark its time. + if ((flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0 && time != AV_NOPTS_VALUE) { + metacube2_pts_packet packet; + packet.type = htobe64(METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS); + packet.pts = htobe64(time); + packet.timebase_num = htobe64(timebase.num); + packet.timebase_den = htobe64(timebase.den); + + metacube2_block_header hdr; + memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(sizeof(packet)); + hdr.flags = htons(METACUBE_FLAGS_METADATA); + hdr.csum = htons(metacube2_compute_crc(&hdr)); + buffered_data.emplace_back((char *)&hdr, sizeof(hdr)); + buffered_data.emplace_back((char *)&packet, sizeof(packet)); + } + + metacube2_block_header hdr; + memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(buf_size); + hdr.flags = htons(flags); + hdr.csum = htons(metacube2_compute_crc(&hdr)); + buffered_data.emplace_back((char *)&hdr, sizeof(hdr)); + } + buffered_data.emplace_back(buf, buf_size); + + // Send a Metacube2 timestamp every keyframe. + if (framing == FRAMING_METACUBE && data_type == DATA_TYPE_KEYFRAME) { + timespec now; + clock_gettime(CLOCK_REALTIME, &now); + + metacube2_timestamp_packet packet; + packet.type = htobe64(METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP); + packet.tv_sec = htobe64(now.tv_sec); + packet.tv_nsec = htobe64(now.tv_nsec); + + metacube2_block_header hdr; + memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(sizeof(packet)); + hdr.flags = htons(METACUBE_FLAGS_METADATA); + hdr.csum = htons(metacube2_compute_crc(&hdr)); + buffered_data.emplace_back((char *)&hdr, sizeof(hdr)); + buffered_data.emplace_back((char *)&packet, sizeof(packet)); + } + + has_buffered_data.notify_all(); +} + +void HTTPD::Stream::stop() +{ + unique_lock lock(buffer_mutex); + should_quit = true; + has_buffered_data.notify_all(); +} diff --git a/httpd.h b/httpd.h new file mode 100644 index 0000000..57c649b --- /dev/null +++ b/httpd.h @@ -0,0 +1,115 @@ +#ifndef _HTTPD_H +#define _HTTPD_H + +// A class dealing with stream output to HTTP. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern "C" { +#include +} + +struct MHD_Connection; +struct MHD_Daemon; + +class HTTPD { +public: + // Returns a pair of content and content-type. + using EndpointCallback = std::function()>; + + HTTPD(); + ~HTTPD(); + + // Should be called before start(). + void set_header(const std::string &data) { + header = data; + } + + // Should be called before start() (due to threading issues). + enum CORSPolicy { + NO_CORS_POLICY, + ALLOW_ALL_ORIGINS + }; + void add_endpoint(const std::string &url, const EndpointCallback &callback, CORSPolicy cors_policy) { + endpoints[url] = Endpoint{ callback, cors_policy }; + } + + void start(int port); + void stop(); + void add_data(const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase); + int64_t get_num_connected_clients() const { + return metric_num_connected_clients.load(); + } + +private: + static int answer_to_connection_thunk(void *cls, MHD_Connection *connection, + const char *url, const char *method, + const char *version, const char *upload_data, + size_t *upload_data_size, void **con_cls); + + int answer_to_connection(MHD_Connection *connection, + const char *url, const char *method, + const char *version, const char *upload_data, + size_t *upload_data_size, void **con_cls); + + static void free_stream(void *cls); + + + class Stream { + public: + enum Framing { + FRAMING_RAW, + FRAMING_METACUBE + }; + Stream(HTTPD *parent, Framing framing) : parent(parent), framing(framing) {} + + static ssize_t reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max); + ssize_t reader_callback(uint64_t pos, char *buf, size_t max); + + enum DataType { + DATA_TYPE_HEADER, + DATA_TYPE_KEYFRAME, + DATA_TYPE_OTHER + }; + void add_data(const char *buf, size_t size, DataType data_type, int64_t time, AVRational timebase); + void stop(); + HTTPD *get_parent() const { return parent; } + + private: + HTTPD *parent; + Framing framing; + + std::mutex buffer_mutex; + bool should_quit = false; // Under . + std::condition_variable has_buffered_data; + std::deque buffered_data; // Protected by . + size_t used_of_buffered_data = 0; // How many bytes of the first element of that is already used. Protected by . + size_t seen_keyframe = false; + }; + + MHD_Daemon *mhd = nullptr; + std::mutex streams_mutex; + std::set streams; // Not owned. + struct Endpoint { + EndpointCallback callback; + CORSPolicy cors_policy; + }; + std::unordered_map endpoints; + std::string header; + + // Metrics. + std::atomic metric_num_connected_clients{0}; +}; + +#endif // !defined(_HTTPD_H) diff --git a/main.cpp b/main.cpp index 2479715..e8c0dc4 100644 --- a/main.cpp +++ b/main.cpp @@ -20,6 +20,7 @@ extern "C" { #include "defs.h" #include "mainwindow.h" #include "ffmpeg_raii.h" +#include "httpd.h" #include "player.h" #include "post_to_main_thread.h" #include "ui_mainwindow.h" @@ -40,12 +41,15 @@ string filename_for_frame(unsigned stream_idx, int64_t pts) mutex frame_mu; vector frames[MAX_STREAMS]; QGLWidget *global_share_widget; +HTTPD *global_httpd; int record_thread_func(); int main(int argc, char **argv) { avformat_network_init(); + global_httpd = new HTTPD; + global_httpd->start(DEFAULT_HTTPD_PORT); QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true); diff --git a/metacube2.cpp b/metacube2.cpp new file mode 100644 index 0000000..6b68132 --- /dev/null +++ b/metacube2.cpp @@ -0,0 +1,60 @@ +/* + * Implementation of Metacube2 utility functions. + * + * Note: This file is meant to compile as both C and C++, for easier inclusion + * in other projects. + */ + +#include "metacube2.h" + +#include +#include + +/* + * https://www.ece.cmu.edu/~koopman/pubs/KoopmanCRCWebinar9May2012.pdf + * recommends this for messages as short as ours (see table at page 34). + */ +#define METACUBE2_CRC_POLYNOMIAL 0x8FDB + +/* Semi-random starting value to make sure all-zero won't pass. */ +#define METACUBE2_CRC_START 0x1234 + +/* This code is based on code generated by pycrc. */ +uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr) +{ + static const int data_len = sizeof(hdr->size) + sizeof(hdr->flags); + const uint8_t *data = (uint8_t *)&hdr->size; + uint16_t crc = METACUBE2_CRC_START; + int i, j; + + for (i = 0; i < data_len; ++i) { + uint8_t c = data[i]; + for (j = 0; j < 8; j++) { + int bit = crc & 0x8000; + crc = (crc << 1) | ((c >> (7 - j)) & 0x01); + if (bit) { + crc ^= METACUBE2_CRC_POLYNOMIAL; + } + } + } + + /* Finalize. */ + for (i = 0; i < 16; i++) { + int bit = crc & 0x8000; + crc = crc << 1; + if (bit) { + crc ^= METACUBE2_CRC_POLYNOMIAL; + } + } + + /* + * Invert the checksum for metadata packets, so that clients that + * don't understand metadata will ignore it as broken. There will + * probably be logging, but apart from that, it's harmless. + */ + if (ntohs(hdr->flags) & METACUBE_FLAGS_METADATA) { + crc ^= 0xffff; + } + + return crc; +} diff --git a/metacube2.h b/metacube2.h new file mode 100644 index 0000000..4f232c8 --- /dev/null +++ b/metacube2.h @@ -0,0 +1,71 @@ +#ifndef _METACUBE2_H +#define _METACUBE2_H + +/* + * Definitions for the Metacube2 protocol, used to communicate with Cubemap. + * + * Note: This file is meant to compile as both C and C++, for easier inclusion + * in other projects. + */ + +#include + +#define METACUBE2_SYNC "cube!map" /* 8 bytes long. */ +#define METACUBE_FLAGS_HEADER 0x1 +#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2 + +/* + * Metadata packets; should not be counted as data, but rather + * parsed (or ignored if you don't understand them). + * + * Metadata packets start with a uint64_t (network byte order) + * that describe the type; the rest is defined by the type. + */ +#define METACUBE_FLAGS_METADATA 0x4 + +struct metacube2_block_header { + char sync[8]; /* METACUBE2_SYNC */ + uint32_t size; /* Network byte order. Does not include header. */ + uint16_t flags; /* Network byte order. METACUBE_FLAGS_*. */ + uint16_t csum; /* Network byte order. CRC16 of size and flags. + If METACUBE_FLAGS_METADATA is set, inverted + so that older clients will ignore it as broken. */ +}; + +uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr); + +/* + * Set by the encoder, and can be measured for latency purposes (e.g., if the + * network can't keep up, the latency will tend to increase. + */ +#define METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP 0x1 + +struct metacube2_timestamp_packet { + uint64_t type; /* METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP, in network byte order. */ + + /* + * Time since the UTC epoch. Basically a struct timespec. + * Both are in network byte order. + */ + uint64_t tv_sec; + uint64_t tv_nsec; +}; + +/* + * Sent before a block to mark its presentation timestamp (ie., counts + * only for the next Metacube block). Used so that the reflector can know + * the length (in seconds) of fragments. + */ +#define METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS 0x2 + +struct metacube2_pts_packet { + uint64_t type; /* METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS, in network byte order. */ + + /* The timestamp of the first packet in the next block, in network byte order. */ + int64_t pts; + + /* Timebase "pts" is expressed in, as a fraction. Network byte order. */ + uint64_t timebase_num, timebase_den; +}; + +#endif /* !defined(_METACUBE_H) */ diff --git a/mux.cpp b/mux.cpp new file mode 100644 index 0000000..37bf321 --- /dev/null +++ b/mux.cpp @@ -0,0 +1,261 @@ +#include "mux.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern "C" { +#include +#include +#include +#include +#include +#include +#include +} + +#include "defs.h" +#include "timebase.h" + +using namespace std; + +struct PacketBefore { + PacketBefore(const AVFormatContext *ctx) : ctx(ctx) {} + + bool operator() (const Mux::QueuedPacket &a_qp, const Mux::QueuedPacket &b_qp) const { + const AVPacket *a = a_qp.pkt; + const AVPacket *b = b_qp.pkt; + int64_t a_dts = (a->dts == AV_NOPTS_VALUE ? a->pts : a->dts); + int64_t b_dts = (b->dts == AV_NOPTS_VALUE ? b->pts : b->dts); + AVRational a_timebase = ctx->streams[a->stream_index]->time_base; + AVRational b_timebase = ctx->streams[b->stream_index]->time_base; + if (av_compare_ts(a_dts, a_timebase, b_dts, b_timebase) != 0) { + return av_compare_ts(a_dts, a_timebase, b_dts, b_timebase) < 0; + } else { + return av_compare_ts(a->pts, a_timebase, b->pts, b_timebase) < 0; + } + } + + const AVFormatContext * const ctx; +}; + +Mux::Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function write_callback, WriteStrategy write_strategy, const vector &metrics) + : write_strategy(write_strategy), avctx(avctx), write_callback(write_callback), metrics(metrics) +{ + avstream_video = avformat_new_stream(avctx, nullptr); + if (avstream_video == nullptr) { + fprintf(stderr, "avformat_new_stream() failed\n"); + exit(1); + } + avstream_video->time_base = AVRational{1, time_base}; + avstream_video->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; + if (video_codec == CODEC_H264) { + avstream_video->codecpar->codec_id = AV_CODEC_ID_H264; + } else { + assert(video_codec == CODEC_NV12); + avstream_video->codecpar->codec_id = AV_CODEC_ID_RAWVIDEO; + avstream_video->codecpar->codec_tag = avcodec_pix_fmt_to_codec_tag(AV_PIX_FMT_NV12); + } + avstream_video->codecpar->width = width; + avstream_video->codecpar->height = height; + + // Colorspace details. Closely correspond to settings in EffectChain_finalize, + // as noted in each comment. + // Note that the H.264 stream also contains this information and depending on the + // mux, this might simply get ignored. See sps_rbsp(). + // Note that there's no way to change this per-frame as the H.264 stream + // would like to be able to. + avstream_video->codecpar->color_primaries = AVCOL_PRI_BT709; // RGB colorspace (inout_format.color_space). + avstream_video->codecpar->color_trc = AVCOL_TRC_IEC61966_2_1; // Gamma curve (inout_format.gamma_curve). + // YUV colorspace (output_ycbcr_format.luma_coefficients). + avstream_video->codecpar->color_space = AVCOL_SPC_BT709; + avstream_video->codecpar->color_range = AVCOL_RANGE_MPEG; // Full vs. limited range (output_ycbcr_format.full_range). + avstream_video->codecpar->chroma_location = AVCHROMA_LOC_LEFT; // Chroma sample location. See chroma_offset_0[] in Mixer::subsample_chroma(). + avstream_video->codecpar->field_order = AV_FIELD_PROGRESSIVE; + + if (!video_extradata.empty()) { + avstream_video->codecpar->extradata = (uint8_t *)av_malloc(video_extradata.size()); + avstream_video->codecpar->extradata_size = video_extradata.size(); + memcpy(avstream_video->codecpar->extradata, video_extradata.data(), video_extradata.size()); + } + + avstream_audio = avformat_new_stream(avctx, nullptr); + if (avstream_audio == nullptr) { + fprintf(stderr, "avformat_new_stream() failed\n"); + exit(1); + } + avstream_audio->time_base = AVRational{1, time_base}; + if (avcodec_parameters_copy(avstream_audio->codecpar, audio_codecpar) < 0) { + fprintf(stderr, "avcodec_parameters_copy() failed\n"); + exit(1); + } + + AVDictionary *options = NULL; + vector> opts = MUX_OPTS; + for (pair opt : opts) { + av_dict_set(&options, opt.first.c_str(), opt.second.c_str(), 0); + } + if (avformat_write_header(avctx, &options) < 0) { + fprintf(stderr, "avformat_write_header() failed\n"); + exit(1); + } + for (MuxMetrics *metric : metrics) { + metric->metric_written_bytes += avctx->pb->pos; + } + + // Make sure the header is written before the constructor exits. + avio_flush(avctx->pb); + + if (write_strategy == WRITE_BACKGROUND) { + writer_thread = thread(&Mux::thread_func, this); + } +} + +Mux::~Mux() +{ + assert(plug_count == 0); + if (write_strategy == WRITE_BACKGROUND) { + writer_thread_should_quit = true; + packet_queue_ready.notify_all(); + writer_thread.join(); + } + int64_t old_pos = avctx->pb->pos; + av_write_trailer(avctx); + for (MuxMetrics *metric : metrics) { + metric->metric_written_bytes += avctx->pb->pos - old_pos; + } + + if (!(avctx->oformat->flags & AVFMT_NOFILE) && + !(avctx->flags & AVFMT_FLAG_CUSTOM_IO)) { + avio_closep(&avctx->pb); + } + avformat_free_context(avctx); +} + +void Mux::add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational timebase, int stream_index_override) +{ + AVPacket pkt_copy; + av_init_packet(&pkt_copy); + if (av_packet_ref(&pkt_copy, &pkt) < 0) { + fprintf(stderr, "av_copy_packet() failed\n"); + exit(1); + } + if (stream_index_override != -1) { + pkt_copy.stream_index = stream_index_override; + } + if (pkt_copy.stream_index == 0) { + pkt_copy.pts = av_rescale_q(pts, timebase, avstream_video->time_base); + pkt_copy.dts = av_rescale_q(dts, timebase, avstream_video->time_base); + pkt_copy.duration = av_rescale_q(pkt.duration, timebase, avstream_video->time_base); + } else if (pkt_copy.stream_index == 1) { + pkt_copy.pts = av_rescale_q(pts, timebase, avstream_audio->time_base); + pkt_copy.dts = av_rescale_q(dts, timebase, avstream_audio->time_base); + pkt_copy.duration = av_rescale_q(pkt.duration, timebase, avstream_audio->time_base); + } else { + assert(false); + } + + { + lock_guard lock(mu); + if (write_strategy == WriteStrategy::WRITE_BACKGROUND) { + packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts }); + if (plug_count == 0) packet_queue_ready.notify_all(); + } else if (plug_count > 0) { + packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts }); + } else { + write_packet_or_die(pkt_copy, pts); + } + } + + av_packet_unref(&pkt_copy); +} + +void Mux::write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts) +{ + for (MuxMetrics *metric : metrics) { + if (pkt.stream_index == 0) { + metric->metric_video_bytes += pkt.size; + } else if (pkt.stream_index == 1) { + metric->metric_audio_bytes += pkt.size; + } else { + assert(false); + } + } + int64_t old_pos = avctx->pb->pos; + if (av_interleaved_write_frame(avctx, const_cast(&pkt)) < 0) { + fprintf(stderr, "av_interleaved_write_frame() failed\n"); + exit(1); + } + avio_flush(avctx->pb); + for (MuxMetrics *metric : metrics) { + metric->metric_written_bytes += avctx->pb->pos - old_pos; + } + + if (pkt.stream_index == 0 && write_callback != nullptr) { + write_callback(unscaled_pts); + } +} + +void Mux::plug() +{ + lock_guard lock(mu); + ++plug_count; +} + +void Mux::unplug() +{ + lock_guard lock(mu); + if (--plug_count > 0) { + return; + } + assert(plug_count >= 0); + + sort(packet_queue.begin(), packet_queue.end(), PacketBefore(avctx)); + + if (write_strategy == WRITE_BACKGROUND) { + packet_queue_ready.notify_all(); + } else { + for (QueuedPacket &qp : packet_queue) { + write_packet_or_die(*qp.pkt, qp.unscaled_pts); + av_packet_free(&qp.pkt); + } + packet_queue.clear(); + } +} + +void Mux::thread_func() +{ + unique_lock lock(mu); + for ( ;; ) { + packet_queue_ready.wait(lock, [this]() { + return writer_thread_should_quit || (!packet_queue.empty() && plug_count == 0); + }); + if (writer_thread_should_quit && packet_queue.empty()) { + // All done. + break; + } + + assert(!packet_queue.empty() && plug_count == 0); + vector packets; + swap(packets, packet_queue); + + lock.unlock(); + for (QueuedPacket &qp : packets) { + write_packet_or_die(*qp.pkt, qp.unscaled_pts); + av_packet_free(&qp.pkt); + } + lock.lock(); + } +} + +void MuxMetrics::init(const vector> &labels) +{ + // TODO: See if we want to reintroduce these. +} diff --git a/mux.h b/mux.h new file mode 100644 index 0000000..9614bff --- /dev/null +++ b/mux.h @@ -0,0 +1,111 @@ +#ifndef _MUX_H +#define _MUX_H 1 + +// Wrapper around an AVFormat mux. + +extern "C" { +#include +#include +} + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "timebase.h" + +struct MuxMetrics { + // “written” will usually be equal video + audio + mux overhead, + // except that there could be buffered packets that count in audio or video + // but not yet in written. + std::atomic metric_video_bytes{0}, metric_audio_bytes{0}, metric_written_bytes{0}; + + // Registers in global_metrics. + void init(const std::vector> &labels); + + void reset() + { + metric_video_bytes = 0; + metric_audio_bytes = 0; + metric_written_bytes = 0; + } +}; + +class Mux { +public: + enum Codec { + CODEC_H264, + CODEC_NV12, // Uncompressed 4:2:0. + }; + enum WriteStrategy { + // add_packet() will write the packet immediately, unless plugged. + WRITE_FOREGROUND, + + // All writes will happen on a separate thread, so add_packet() + // won't block. Use this if writing to a file and you might be + // holding a mutex (because blocking I/O with a mutex held is + // not good). Note that this will clone every packet, so it has + // higher overhead. + WRITE_BACKGROUND, + }; + + // Takes ownership of avctx. will be called every time + // a write has been made to the video stream (id 0), with the pts of + // the just-written frame. (write_callback can be nullptr.) + // Does not take ownership of ; elements in there, if any, + // will be added to. + Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const std::string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function write_callback, WriteStrategy write_strategy, const std::vector &metrics); + ~Mux(); + void add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational timebase = { 1, TIMEBASE }, int stream_index_override = -1); + + // As long as the mux is plugged, it will not actually write anything to disk, + // just queue the packets. Once it is unplugged, the packets are reordered by pts + // and written. This is primarily useful if you might have two different encoders + // writing to the mux at the same time (because one is shutting down), so that + // pts might otherwise come out-of-order. + // + // You can plug and unplug multiple times; only when the plug count reaches zero, + // something will actually happen. + void plug(); + void unplug(); + +private: + // If write_strategy == WRITE_FOREGORUND, Must be called with held. + void write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts); + void thread_func(); + + WriteStrategy write_strategy; + + std::mutex mu; + + // These are only in use if write_strategy == WRITE_BACKGROUND. + std::atomic writer_thread_should_quit{false}; + std::thread writer_thread; + + AVFormatContext *avctx; // Protected by , iff write_strategy == WRITE_BACKGROUND. + int plug_count = 0; // Protected by . + + // Protected by . If write_strategy == WRITE_FOREGROUND, + // this is only in use when plugging. + struct QueuedPacket { + AVPacket *pkt; + int64_t unscaled_pts; + }; + std::vector packet_queue; + std::condition_variable packet_queue_ready; + + AVStream *avstream_video, *avstream_audio; + + std::function write_callback; + std::vector metrics; + + friend struct PacketBefore; +}; + +#endif // !defined(_MUX_H) diff --git a/timebase.h b/timebase.h new file mode 100644 index 0000000..dbc4402 --- /dev/null +++ b/timebase.h @@ -0,0 +1,25 @@ +#ifndef _TIMEBASE_H +#define _TIMEBASE_H 1 + +// Common timebase that allows us to represent one frame exactly in all the +// relevant frame rates: +// +// Timebase: 1/120000 +// Frame at 50fps: 2400/120000 +// Frame at 60fps: 2000/120000 +// Frame at 59.94fps: 2002/120000 +// Frame at 23.976fps: 5005/120000 +// +// If we also wanted to represent one sample at 48000 Hz, we'd need +// to go to 300000. Also supporting one sample at 44100 Hz would mean +// going to 44100000; probably a bit excessive. +#define TIMEBASE 120000 + +// Some muxes, like MP4 (or at least avformat's implementation of it), +// are not too fond of values above 2^31. At timebase 120000, that's only +// about five hours or so, so we define a coarser timebase that doesn't +// get 59.94 precisely (so there will be a marginal amount of pts jitter), +// but can do at least 50 and 60 precisely, and months of streaming. +#define COARSE_TIMEBASE 300 + +#endif // !defined(_TIMEBASE_H) -- 2.39.2