]> git.sesse.net Git - nageru/commitdiff
Import a bunch of http/mux code from Nageru.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 18 Aug 2018 19:48:39 +0000 (21:48 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 18 Aug 2018 19:48:39 +0000 (21:48 +0200)
Makefile
defs.h
httpd.cpp [new file with mode: 0644]
httpd.h [new file with mode: 0644]
main.cpp
metacube2.cpp [new file with mode: 0644]
metacube2.h [new file with mode: 0644]
mux.cpp [new file with mode: 0644]
mux.h [new file with mode: 0644]
timebase.h [new file with mode: 0644]

index 7ae9a5cd93e8bca81093cfce5b23d7f28b6a50ba..305516e7c6c7529606364a6af4cf2827090d5a03 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,5 +1,5 @@
 CXX=g++
-PKG_MODULES := Qt5Core Qt5Gui Qt5Widgets Qt5OpenGLExtensions Qt5OpenGL Qt5PrintSupport libjpeg movit
+PKG_MODULES := Qt5Core Qt5Gui Qt5Widgets Qt5OpenGLExtensions Qt5OpenGL Qt5PrintSupport libjpeg movit libmicrohttpd
 CXXFLAGS ?= -O2 -g -Wall  # Will be overridden by environment.
 CXXFLAGS += -fPIC $(shell pkg-config --cflags $(PKG_MODULES)) -DMOVIT_SHADER_DIR=\"$(shell pkg-config --variable=shaderdir movit)\" -pthread
 
@@ -10,7 +10,7 @@ OBJS_WITH_MOC = mainwindow.o jpeg_frame_view.o clip_list.o
 OBJS += $(OBJS_WITH_MOC)
 OBJS += $(OBJS_WITH_MOC:.o=.moc.o) 
 
-OBJS += ffmpeg_raii.o main.o player.o
+OBJS += ffmpeg_raii.o main.o player.o httpd.o mux.o metacube2.o
 
 %.o: %.cpp
        $(CXX) -MMD -MP $(CPPFLAGS) $(CXXFLAGS) -o $@ -c $<
diff --git a/defs.h b/defs.h
index 9d11750a43d86cb9564b88fd6bf0037e704ae19b..78ee07f1b3dd4f652a0a66098f4c96b65d67ef80 100644 (file)
--- a/defs.h
+++ b/defs.h
@@ -4,5 +4,20 @@
 #define MAX_STREAMS 16
 #define CACHE_SIZE 1000  // In number of frames.
 #define NUM_CAMERAS 4
+#define MUX_BUFFER_SIZE 10485760
+
+#define DEFAULT_STREAM_MUX_NAME "nut"  // Only for HTTP. Local dump guesses from LOCAL_DUMP_SUFFIX.
+#define DEFAULT_HTTPD_PORT 9095
+#define MUX_OPTS { \
+       /* Make seekable .mov files, and keep MP4 muxer from using unlimited amounts of memory. */ \
+       { "movflags", "empty_moov+frag_keyframe+default_base_moof+skip_trailer" }, \
+       \
+       /* Make for somewhat less bursty stream output when using .mov. */ \
+       { "frag_duration", "125000" }, \
+       \
+       /* Keep nut muxer from using unlimited amounts of memory. */ \
+       { "write_index", "0" } \
+}
+
 
 #endif  // !defined(_DEFS_H)
diff --git a/httpd.cpp b/httpd.cpp
new file mode 100644 (file)
index 0000000..fddc45a
--- /dev/null
+++ b/httpd.cpp
@@ -0,0 +1,264 @@
+#include "httpd.h"
+
+#include <assert.h>
+#include <byteswap.h>
+#include <endian.h>
+#include <microhttpd.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+#include <memory>
+extern "C" {
+#include <libavutil/avutil.h>
+}
+
+#include "defs.h"
+#include "metacube2.h"
+
+struct MHD_Connection;
+struct MHD_Response;
+
+using namespace std;
+
+HTTPD::HTTPD()
+{
+}
+
+HTTPD::~HTTPD()
+{
+       stop();
+}
+
+void HTTPD::start(int port)
+{
+       mhd = MHD_start_daemon(MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL_INTERNALLY | MHD_USE_DUAL_STACK,
+                              port,
+                              nullptr, nullptr,
+                              &answer_to_connection_thunk, this,
+                              MHD_OPTION_NOTIFY_COMPLETED, nullptr, this,
+                              MHD_OPTION_END);
+       if (mhd == nullptr) {
+               fprintf(stderr, "Warning: Could not open HTTP server. (Port already in use?)\n");
+       }
+}
+
+void HTTPD::stop()
+{
+       if (mhd) {
+               MHD_quiesce_daemon(mhd);
+               for (Stream *stream : streams) {
+                       stream->stop();
+               }
+               MHD_stop_daemon(mhd);
+               mhd = nullptr;
+       }
+}
+
+void HTTPD::add_data(const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase)
+{
+       unique_lock<mutex> lock(streams_mutex);
+       for (Stream *stream : streams) {
+               stream->add_data(buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER, time, timebase);
+       }
+}
+
+int HTTPD::answer_to_connection_thunk(void *cls, MHD_Connection *connection,
+                                      const char *url, const char *method,
+                                      const char *version, const char *upload_data,
+                                      size_t *upload_data_size, void **con_cls)
+{
+       HTTPD *httpd = (HTTPD *)cls;
+       return httpd->answer_to_connection(connection, url, method, version, upload_data, upload_data_size, con_cls);
+}
+
+int HTTPD::answer_to_connection(MHD_Connection *connection,
+                                const char *url, const char *method,
+                               const char *version, const char *upload_data,
+                               size_t *upload_data_size, void **con_cls)
+{
+       // See if the URL ends in “.metacube”.
+       HTTPD::Stream::Framing framing;
+       if (strstr(url, ".metacube") == url + strlen(url) - strlen(".metacube")) {
+               framing = HTTPD::Stream::FRAMING_METACUBE;
+       } else {
+               framing = HTTPD::Stream::FRAMING_RAW;
+       }
+
+       if (endpoints.count(url)) {
+               pair<string, string> contents_and_type = endpoints[url].callback();
+               MHD_Response *response = MHD_create_response_from_buffer(
+                       contents_and_type.first.size(), &contents_and_type.first[0], MHD_RESPMEM_MUST_COPY);
+               MHD_add_response_header(response, "Content-type", contents_and_type.second.c_str());
+               if (endpoints[url].cors_policy == ALLOW_ALL_ORIGINS) {
+                       MHD_add_response_header(response, "Access-Control-Allow-Origin", "*");
+               }
+               int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
+               MHD_destroy_response(response);  // Only decreases the refcount; actual free is after the request is done.
+               return ret;
+       }
+
+       // Small hack; reject unknown /channels/foo.
+       if (string(url).find("/channels/") == 0) {
+               string contents = "Not found.";
+               MHD_Response *response = MHD_create_response_from_buffer(
+                       contents.size(), &contents[0], MHD_RESPMEM_MUST_COPY);
+               MHD_add_response_header(response, "Content-type", "text/plain");
+               int ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);
+               MHD_destroy_response(response);  // Only decreases the refcount; actual free is after the request is done.
+               return ret;
+       }
+
+       HTTPD::Stream *stream = new HTTPD::Stream(this, framing);
+       stream->add_data(header.data(), header.size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 });
+       {
+               unique_lock<mutex> lock(streams_mutex);
+               streams.insert(stream);
+       }
+       ++metric_num_connected_clients;
+       *con_cls = stream;
+
+       // Does not strictly have to be equal to MUX_BUFFER_SIZE.
+       MHD_Response *response = MHD_create_response_from_callback(
+               (size_t)-1, MUX_BUFFER_SIZE, &HTTPD::Stream::reader_callback_thunk, stream, &HTTPD::free_stream);
+       // TODO: Content-type?
+       if (framing == HTTPD::Stream::FRAMING_METACUBE) {
+               MHD_add_response_header(response, "Content-encoding", "metacube");
+       }
+
+       int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
+       MHD_destroy_response(response);  // Only decreases the refcount; actual free is after the request is done.
+
+       return ret;
+}
+
+void HTTPD::free_stream(void *cls)
+{
+       HTTPD::Stream *stream = (HTTPD::Stream *)cls;
+       HTTPD *httpd = stream->get_parent();
+       {
+               unique_lock<mutex> lock(httpd->streams_mutex);
+               delete stream;
+               httpd->streams.erase(stream);
+       }
+       --httpd->metric_num_connected_clients;
+}
+
+ssize_t HTTPD::Stream::reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max)
+{
+       HTTPD::Stream *stream = (HTTPD::Stream *)cls;
+       return stream->reader_callback(pos, buf, max);
+}
+
+ssize_t HTTPD::Stream::reader_callback(uint64_t pos, char *buf, size_t max)
+{
+       unique_lock<mutex> lock(buffer_mutex);
+       has_buffered_data.wait(lock, [this]{ return should_quit || !buffered_data.empty(); });
+       if (should_quit) {
+               return 0;
+       }
+
+       ssize_t ret = 0;
+       while (max > 0 && !buffered_data.empty()) {
+               const string &s = buffered_data.front();
+               assert(s.size() > used_of_buffered_data);
+               size_t len = s.size() - used_of_buffered_data;
+               if (max >= len) {
+                       // Consume the entire (rest of the) string.
+                       memcpy(buf, s.data() + used_of_buffered_data, len);
+                       buf += len;
+                       ret += len;
+                       max -= len;
+                       buffered_data.pop_front();
+                       used_of_buffered_data = 0;
+               } else {
+                       // We don't need the entire string; just use the first part of it.
+                       memcpy(buf, s.data() + used_of_buffered_data, max);
+                       buf += max;
+                       used_of_buffered_data += max;
+                       ret += max;
+                       max = 0;
+               }
+       }
+
+       return ret;
+}
+
+void HTTPD::Stream::add_data(const char *buf, size_t buf_size, HTTPD::Stream::DataType data_type, int64_t time, AVRational timebase)
+{
+       if (buf_size == 0) {
+               return;
+       }
+       if (data_type == DATA_TYPE_KEYFRAME) {
+               seen_keyframe = true;
+       } else if (data_type == DATA_TYPE_OTHER && !seen_keyframe) {
+               // Start sending only once we see a keyframe.
+               return;
+       }
+
+       unique_lock<mutex> lock(buffer_mutex);
+
+       if (framing == FRAMING_METACUBE) {
+               int flags = 0;
+               if (data_type == DATA_TYPE_HEADER) {
+                       flags |= METACUBE_FLAGS_HEADER;
+               } else if (data_type == DATA_TYPE_OTHER) {
+                       flags |= METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
+               }
+
+               // If we're about to send a keyframe, send a pts metadata block
+               // to mark its time.
+               if ((flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0 && time != AV_NOPTS_VALUE) {
+                       metacube2_pts_packet packet;
+                       packet.type = htobe64(METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS);
+                       packet.pts = htobe64(time);
+                       packet.timebase_num = htobe64(timebase.num);
+                       packet.timebase_den = htobe64(timebase.den);
+
+                       metacube2_block_header hdr;
+                       memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+                       hdr.size = htonl(sizeof(packet));
+                       hdr.flags = htons(METACUBE_FLAGS_METADATA);
+                       hdr.csum = htons(metacube2_compute_crc(&hdr));
+                       buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+                       buffered_data.emplace_back((char *)&packet, sizeof(packet));
+               }
+
+               metacube2_block_header hdr;
+               memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+               hdr.size = htonl(buf_size);
+               hdr.flags = htons(flags);
+               hdr.csum = htons(metacube2_compute_crc(&hdr));
+               buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+       }
+       buffered_data.emplace_back(buf, buf_size);
+
+       // Send a Metacube2 timestamp every keyframe.
+       if (framing == FRAMING_METACUBE && data_type == DATA_TYPE_KEYFRAME) {
+               timespec now;
+               clock_gettime(CLOCK_REALTIME, &now);
+
+               metacube2_timestamp_packet packet;
+               packet.type = htobe64(METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP);
+               packet.tv_sec = htobe64(now.tv_sec);
+               packet.tv_nsec = htobe64(now.tv_nsec);
+
+               metacube2_block_header hdr;
+               memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
+               hdr.size = htonl(sizeof(packet));
+               hdr.flags = htons(METACUBE_FLAGS_METADATA);
+               hdr.csum = htons(metacube2_compute_crc(&hdr));
+               buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+               buffered_data.emplace_back((char *)&packet, sizeof(packet));
+       }
+
+       has_buffered_data.notify_all(); 
+}
+
+void HTTPD::Stream::stop()
+{
+       unique_lock<mutex> lock(buffer_mutex);
+       should_quit = true;
+       has_buffered_data.notify_all();
+}
diff --git a/httpd.h b/httpd.h
new file mode 100644 (file)
index 0000000..57c649b
--- /dev/null
+++ b/httpd.h
@@ -0,0 +1,115 @@
+#ifndef _HTTPD_H
+#define _HTTPD_H
+
+// A class dealing with stream output to HTTP.
+
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <functional>
+#include <mutex>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+extern "C" {
+#include <libavutil/rational.h>
+}
+
+struct MHD_Connection;
+struct MHD_Daemon;
+
+class HTTPD {
+public:
+       // Returns a pair of content and content-type.
+       using EndpointCallback = std::function<std::pair<std::string, std::string>()>;
+
+       HTTPD();
+       ~HTTPD();
+
+       // Should be called before start().
+       void set_header(const std::string &data) {
+               header = data;
+       }
+
+       // Should be called before start() (due to threading issues).
+       enum CORSPolicy {
+               NO_CORS_POLICY,
+               ALLOW_ALL_ORIGINS
+       };
+       void add_endpoint(const std::string &url, const EndpointCallback &callback, CORSPolicy cors_policy) {
+               endpoints[url] = Endpoint{ callback, cors_policy };
+       }
+
+       void start(int port);
+       void stop();
+       void add_data(const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase);
+       int64_t get_num_connected_clients() const {
+               return metric_num_connected_clients.load();
+       }
+
+private:
+       static int answer_to_connection_thunk(void *cls, MHD_Connection *connection,
+                                             const char *url, const char *method,
+                                             const char *version, const char *upload_data,
+                                             size_t *upload_data_size, void **con_cls);
+
+       int answer_to_connection(MHD_Connection *connection,
+                                const char *url, const char *method,
+                                const char *version, const char *upload_data,
+                                size_t *upload_data_size, void **con_cls);
+
+       static void free_stream(void *cls);
+
+
+       class Stream {
+       public:
+               enum Framing {
+                       FRAMING_RAW,
+                       FRAMING_METACUBE
+               };
+               Stream(HTTPD *parent, Framing framing) : parent(parent), framing(framing) {}
+
+               static ssize_t reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max);
+               ssize_t reader_callback(uint64_t pos, char *buf, size_t max);
+
+               enum DataType {
+                       DATA_TYPE_HEADER,
+                       DATA_TYPE_KEYFRAME,
+                       DATA_TYPE_OTHER
+               };
+               void add_data(const char *buf, size_t size, DataType data_type, int64_t time, AVRational timebase);
+               void stop();
+               HTTPD *get_parent() const { return parent; }
+
+       private:
+               HTTPD *parent;
+               Framing framing;
+
+               std::mutex buffer_mutex;
+               bool should_quit = false;  // Under <buffer_mutex>.
+               std::condition_variable has_buffered_data;
+               std::deque<std::string> buffered_data;  // Protected by <buffer_mutex>.
+               size_t used_of_buffered_data = 0;  // How many bytes of the first element of <buffered_data> that is already used. Protected by <mutex>.
+               size_t seen_keyframe = false;
+       };
+
+       MHD_Daemon *mhd = nullptr;
+       std::mutex streams_mutex;
+       std::set<Stream *> streams;  // Not owned.
+       struct Endpoint {
+               EndpointCallback callback;
+               CORSPolicy cors_policy;
+       };
+       std::unordered_map<std::string, Endpoint> endpoints;
+       std::string header;
+
+       // Metrics.
+       std::atomic<int64_t> metric_num_connected_clients{0};
+};
+
+#endif  // !defined(_HTTPD_H)
index 2479715565af8d227c224dc4c756458cdfa0bfdf..e8c0dc4ac1fe7e6769dea0b98377b29729c137b1 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -20,6 +20,7 @@ extern "C" {
 #include "defs.h"
 #include "mainwindow.h"
 #include "ffmpeg_raii.h"
+#include "httpd.h"
 #include "player.h"
 #include "post_to_main_thread.h"
 #include "ui_mainwindow.h"
@@ -40,12 +41,15 @@ string filename_for_frame(unsigned stream_idx, int64_t pts)
 mutex frame_mu;
 vector<int64_t> frames[MAX_STREAMS];
 QGLWidget *global_share_widget;
+HTTPD *global_httpd;
 
 int record_thread_func();
 
 int main(int argc, char **argv)
 {
        avformat_network_init();
+       global_httpd = new HTTPD;
+       global_httpd->start(DEFAULT_HTTPD_PORT);
 
        QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
 
diff --git a/metacube2.cpp b/metacube2.cpp
new file mode 100644 (file)
index 0000000..6b68132
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Implementation of Metacube2 utility functions.
+ *
+ * Note: This file is meant to compile as both C and C++, for easier inclusion
+ * in other projects.
+ */
+
+#include "metacube2.h"
+
+#include <byteswap.h>
+#include <netinet/in.h>
+
+/*
+ * https://www.ece.cmu.edu/~koopman/pubs/KoopmanCRCWebinar9May2012.pdf
+ * recommends this for messages as short as ours (see table at page 34).
+ */
+#define METACUBE2_CRC_POLYNOMIAL 0x8FDB
+
+/* Semi-random starting value to make sure all-zero won't pass. */
+#define METACUBE2_CRC_START 0x1234
+
+/* This code is based on code generated by pycrc. */
+uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr)
+{
+       static const int data_len = sizeof(hdr->size) + sizeof(hdr->flags);
+       const uint8_t *data = (uint8_t *)&hdr->size;
+       uint16_t crc = METACUBE2_CRC_START;
+       int i, j;
+
+       for (i = 0; i < data_len; ++i) {
+               uint8_t c = data[i];
+               for (j = 0; j < 8; j++) {
+                       int bit = crc & 0x8000;
+                       crc = (crc << 1) | ((c >> (7 - j)) & 0x01);
+                       if (bit) {
+                               crc ^= METACUBE2_CRC_POLYNOMIAL;
+                       }
+               }
+       }
+
+       /* Finalize. */
+       for (i = 0; i < 16; i++) {
+               int bit = crc & 0x8000;
+               crc = crc << 1;
+               if (bit) {
+                       crc ^= METACUBE2_CRC_POLYNOMIAL;
+               }
+       }
+
+       /*
+        * Invert the checksum for metadata packets, so that clients that
+        * don't understand metadata will ignore it as broken. There will
+        * probably be logging, but apart from that, it's harmless.
+        */
+       if (ntohs(hdr->flags) & METACUBE_FLAGS_METADATA) {
+               crc ^= 0xffff;
+       }
+
+       return crc;
+}
diff --git a/metacube2.h b/metacube2.h
new file mode 100644 (file)
index 0000000..4f232c8
--- /dev/null
@@ -0,0 +1,71 @@
+#ifndef _METACUBE2_H
+#define _METACUBE2_H
+
+/*
+ * Definitions for the Metacube2 protocol, used to communicate with Cubemap.
+ *
+ * Note: This file is meant to compile as both C and C++, for easier inclusion
+ * in other projects.
+ */
+
+#include <stdint.h>
+
+#define METACUBE2_SYNC "cube!map"  /* 8 bytes long. */
+#define METACUBE_FLAGS_HEADER 0x1
+#define METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START 0x2
+
+/*
+ * Metadata packets; should not be counted as data, but rather
+ * parsed (or ignored if you don't understand them).
+ *
+ * Metadata packets start with a uint64_t (network byte order)
+ * that describe the type; the rest is defined by the type.
+ */
+#define METACUBE_FLAGS_METADATA 0x4
+
+struct metacube2_block_header {
+       char sync[8];    /* METACUBE2_SYNC */
+       uint32_t size;   /* Network byte order. Does not include header. */
+       uint16_t flags;  /* Network byte order. METACUBE_FLAGS_*. */
+       uint16_t csum;   /* Network byte order. CRC16 of size and flags.
+                            If METACUBE_FLAGS_METADATA is set, inverted
+                            so that older clients will ignore it as broken. */
+};
+
+uint16_t metacube2_compute_crc(const struct metacube2_block_header *hdr);
+
+/*
+ * Set by the encoder, and can be measured for latency purposes (e.g., if the
+ * network can't keep up, the latency will tend to increase.
+ */
+#define METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP 0x1
+
+struct metacube2_timestamp_packet {
+       uint64_t type;  /* METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP, in network byte order. */
+
+       /*
+        * Time since the UTC epoch. Basically a struct timespec.
+        * Both are in network byte order.
+        */
+       uint64_t tv_sec;
+       uint64_t tv_nsec;
+};
+
+/*
+ * Sent before a block to mark its presentation timestamp (ie., counts
+ * only for the next Metacube block). Used so that the reflector can know
+ * the length (in seconds) of fragments.
+ */
+#define METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS 0x2
+
+struct metacube2_pts_packet {
+       uint64_t type;  /* METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS, in network byte order. */
+
+       /* The timestamp of the first packet in the next block, in network byte order. */
+       int64_t pts;
+
+       /* Timebase "pts" is expressed in, as a fraction. Network byte order. */
+       uint64_t timebase_num, timebase_den;
+};
+
+#endif  /* !defined(_METACUBE_H) */
diff --git a/mux.cpp b/mux.cpp
new file mode 100644 (file)
index 0000000..37bf321
--- /dev/null
+++ b/mux.cpp
@@ -0,0 +1,261 @@
+#include "mux.h"
+
+#include <assert.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <algorithm>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+extern "C" {
+#include <libavformat/avio.h>
+#include <libavutil/avutil.h>
+#include <libavutil/dict.h>
+#include <libavutil/mathematics.h>
+#include <libavutil/mem.h>
+#include <libavutil/pixfmt.h>
+#include <libavutil/rational.h>
+}
+
+#include "defs.h"
+#include "timebase.h"
+
+using namespace std;
+
+struct PacketBefore {
+       PacketBefore(const AVFormatContext *ctx) : ctx(ctx) {}
+
+       bool operator() (const Mux::QueuedPacket &a_qp, const Mux::QueuedPacket &b_qp) const {
+               const AVPacket *a = a_qp.pkt;
+               const AVPacket *b = b_qp.pkt;
+               int64_t a_dts = (a->dts == AV_NOPTS_VALUE ? a->pts : a->dts);
+               int64_t b_dts = (b->dts == AV_NOPTS_VALUE ? b->pts : b->dts);
+               AVRational a_timebase = ctx->streams[a->stream_index]->time_base;
+               AVRational b_timebase = ctx->streams[b->stream_index]->time_base;
+               if (av_compare_ts(a_dts, a_timebase, b_dts, b_timebase) != 0) {
+                       return av_compare_ts(a_dts, a_timebase, b_dts, b_timebase) < 0;
+               } else {
+                       return av_compare_ts(a->pts, a_timebase, b->pts, b_timebase) < 0;
+               }
+       }
+
+       const AVFormatContext * const ctx;
+};
+
+Mux::Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, WriteStrategy write_strategy, const vector<MuxMetrics *> &metrics)
+       : write_strategy(write_strategy), avctx(avctx), write_callback(write_callback), metrics(metrics)
+{
+       avstream_video = avformat_new_stream(avctx, nullptr);
+       if (avstream_video == nullptr) {
+               fprintf(stderr, "avformat_new_stream() failed\n");
+               exit(1);
+       }
+       avstream_video->time_base = AVRational{1, time_base};
+       avstream_video->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
+       if (video_codec == CODEC_H264) {
+               avstream_video->codecpar->codec_id = AV_CODEC_ID_H264;
+       } else {
+               assert(video_codec == CODEC_NV12);
+               avstream_video->codecpar->codec_id = AV_CODEC_ID_RAWVIDEO;
+               avstream_video->codecpar->codec_tag = avcodec_pix_fmt_to_codec_tag(AV_PIX_FMT_NV12);
+       }
+       avstream_video->codecpar->width = width;
+       avstream_video->codecpar->height = height;
+
+       // Colorspace details. Closely correspond to settings in EffectChain_finalize,
+       // as noted in each comment.
+       // Note that the H.264 stream also contains this information and depending on the
+       // mux, this might simply get ignored. See sps_rbsp().
+       // Note that there's no way to change this per-frame as the H.264 stream
+       // would like to be able to.
+       avstream_video->codecpar->color_primaries = AVCOL_PRI_BT709;  // RGB colorspace (inout_format.color_space).
+       avstream_video->codecpar->color_trc = AVCOL_TRC_IEC61966_2_1;  // Gamma curve (inout_format.gamma_curve).
+       // YUV colorspace (output_ycbcr_format.luma_coefficients).
+       avstream_video->codecpar->color_space = AVCOL_SPC_BT709;
+       avstream_video->codecpar->color_range = AVCOL_RANGE_MPEG;  // Full vs. limited range (output_ycbcr_format.full_range).
+       avstream_video->codecpar->chroma_location = AVCHROMA_LOC_LEFT;  // Chroma sample location. See chroma_offset_0[] in Mixer::subsample_chroma().
+       avstream_video->codecpar->field_order = AV_FIELD_PROGRESSIVE;
+
+       if (!video_extradata.empty()) {
+               avstream_video->codecpar->extradata = (uint8_t *)av_malloc(video_extradata.size());
+               avstream_video->codecpar->extradata_size = video_extradata.size();
+               memcpy(avstream_video->codecpar->extradata, video_extradata.data(), video_extradata.size());
+       }
+
+       avstream_audio = avformat_new_stream(avctx, nullptr);
+       if (avstream_audio == nullptr) {
+               fprintf(stderr, "avformat_new_stream() failed\n");
+               exit(1);
+       }
+       avstream_audio->time_base = AVRational{1, time_base};
+       if (avcodec_parameters_copy(avstream_audio->codecpar, audio_codecpar) < 0) {
+               fprintf(stderr, "avcodec_parameters_copy() failed\n");
+               exit(1);
+       }
+
+       AVDictionary *options = NULL;
+       vector<pair<string, string>> opts = MUX_OPTS;
+       for (pair<string, string> opt : opts) {
+               av_dict_set(&options, opt.first.c_str(), opt.second.c_str(), 0);
+       }
+       if (avformat_write_header(avctx, &options) < 0) {
+               fprintf(stderr, "avformat_write_header() failed\n");
+               exit(1);
+       }
+       for (MuxMetrics *metric : metrics) {
+               metric->metric_written_bytes += avctx->pb->pos;
+       }
+
+       // Make sure the header is written before the constructor exits.
+       avio_flush(avctx->pb);
+
+       if (write_strategy == WRITE_BACKGROUND) {
+               writer_thread = thread(&Mux::thread_func, this);
+       }
+}
+
+Mux::~Mux()
+{
+       assert(plug_count == 0);
+       if (write_strategy == WRITE_BACKGROUND) {
+               writer_thread_should_quit = true;
+               packet_queue_ready.notify_all();
+               writer_thread.join();
+       }
+       int64_t old_pos = avctx->pb->pos;
+       av_write_trailer(avctx);
+       for (MuxMetrics *metric : metrics) {
+               metric->metric_written_bytes += avctx->pb->pos - old_pos;
+       }
+
+       if (!(avctx->oformat->flags & AVFMT_NOFILE) &&
+           !(avctx->flags & AVFMT_FLAG_CUSTOM_IO)) {
+               avio_closep(&avctx->pb);
+       }
+       avformat_free_context(avctx);
+}
+
+void Mux::add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational timebase, int stream_index_override)
+{
+       AVPacket pkt_copy;
+       av_init_packet(&pkt_copy);
+       if (av_packet_ref(&pkt_copy, &pkt) < 0) {
+               fprintf(stderr, "av_copy_packet() failed\n");
+               exit(1);
+       }
+       if (stream_index_override != -1) {
+               pkt_copy.stream_index = stream_index_override;
+       }
+       if (pkt_copy.stream_index == 0) {
+               pkt_copy.pts = av_rescale_q(pts, timebase, avstream_video->time_base);
+               pkt_copy.dts = av_rescale_q(dts, timebase, avstream_video->time_base);
+               pkt_copy.duration = av_rescale_q(pkt.duration, timebase, avstream_video->time_base);
+       } else if (pkt_copy.stream_index == 1) {
+               pkt_copy.pts = av_rescale_q(pts, timebase, avstream_audio->time_base);
+               pkt_copy.dts = av_rescale_q(dts, timebase, avstream_audio->time_base);
+               pkt_copy.duration = av_rescale_q(pkt.duration, timebase, avstream_audio->time_base);
+       } else {
+               assert(false);
+       }
+
+       {
+               lock_guard<mutex> lock(mu);
+               if (write_strategy == WriteStrategy::WRITE_BACKGROUND) {
+                       packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
+                       if (plug_count == 0) packet_queue_ready.notify_all();
+               } else if (plug_count > 0) {
+                       packet_queue.push_back(QueuedPacket{ av_packet_clone(&pkt_copy), pts });
+               } else {
+                       write_packet_or_die(pkt_copy, pts);
+               }
+       }
+
+       av_packet_unref(&pkt_copy);
+}
+
+void Mux::write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts)
+{
+       for (MuxMetrics *metric : metrics) {
+               if (pkt.stream_index == 0) {
+                       metric->metric_video_bytes += pkt.size;
+               } else if (pkt.stream_index == 1) {
+                       metric->metric_audio_bytes += pkt.size;
+               } else {
+                       assert(false);
+               }
+       }
+       int64_t old_pos = avctx->pb->pos;
+       if (av_interleaved_write_frame(avctx, const_cast<AVPacket *>(&pkt)) < 0) {
+               fprintf(stderr, "av_interleaved_write_frame() failed\n");
+               exit(1);
+       }
+       avio_flush(avctx->pb);
+       for (MuxMetrics *metric : metrics) {
+               metric->metric_written_bytes += avctx->pb->pos - old_pos;
+       }
+
+       if (pkt.stream_index == 0 && write_callback != nullptr) {
+               write_callback(unscaled_pts);
+       }
+}
+
+void Mux::plug()
+{
+       lock_guard<mutex> lock(mu);
+       ++plug_count;
+}
+
+void Mux::unplug()
+{
+       lock_guard<mutex> lock(mu);
+       if (--plug_count > 0) {
+               return;
+       }
+       assert(plug_count >= 0);
+
+       sort(packet_queue.begin(), packet_queue.end(), PacketBefore(avctx));
+
+       if (write_strategy == WRITE_BACKGROUND) {
+               packet_queue_ready.notify_all();
+       } else {
+               for (QueuedPacket &qp : packet_queue) {
+                       write_packet_or_die(*qp.pkt, qp.unscaled_pts);
+                       av_packet_free(&qp.pkt);
+               }
+               packet_queue.clear();
+       }
+}
+
+void Mux::thread_func()
+{
+       unique_lock<mutex> lock(mu);
+       for ( ;; ) {
+               packet_queue_ready.wait(lock, [this]() {
+                       return writer_thread_should_quit || (!packet_queue.empty() && plug_count == 0);
+               });
+               if (writer_thread_should_quit && packet_queue.empty()) {
+                       // All done.
+                       break;
+               }
+
+               assert(!packet_queue.empty() && plug_count == 0);
+               vector<QueuedPacket> packets;
+               swap(packets, packet_queue);
+
+               lock.unlock();
+               for (QueuedPacket &qp : packets) {
+                       write_packet_or_die(*qp.pkt, qp.unscaled_pts);
+                       av_packet_free(&qp.pkt);
+               }
+               lock.lock();
+       }
+}
+
+void MuxMetrics::init(const vector<pair<string, string>> &labels)
+{
+       // TODO: See if we want to reintroduce these.
+}
diff --git a/mux.h b/mux.h
new file mode 100644 (file)
index 0000000..9614bff
--- /dev/null
+++ b/mux.h
@@ -0,0 +1,111 @@
+#ifndef _MUX_H
+#define _MUX_H 1
+
+// Wrapper around an AVFormat mux.
+
+extern "C" {
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+}
+
+#include <sys/types.h>
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <thread>
+#include <vector>
+
+#include "timebase.h"
+
+struct MuxMetrics {
+       // “written” will usually be equal video + audio + mux overhead,
+       // except that there could be buffered packets that count in audio or video
+       // but not yet in written.
+       std::atomic<int64_t> metric_video_bytes{0}, metric_audio_bytes{0}, metric_written_bytes{0};
+
+       // Registers in global_metrics.
+       void init(const std::vector<std::pair<std::string, std::string>> &labels);
+
+       void reset()
+       {
+               metric_video_bytes = 0;
+               metric_audio_bytes = 0;
+               metric_written_bytes = 0;
+       }
+};
+
+class Mux {
+public:
+       enum Codec {
+               CODEC_H264,
+               CODEC_NV12,  // Uncompressed 4:2:0.
+       };
+       enum WriteStrategy {
+               // add_packet() will write the packet immediately, unless plugged.
+               WRITE_FOREGROUND,
+
+               // All writes will happen on a separate thread, so add_packet()
+               // won't block. Use this if writing to a file and you might be
+               // holding a mutex (because blocking I/O with a mutex held is
+               // not good). Note that this will clone every packet, so it has
+               // higher overhead.
+               WRITE_BACKGROUND,
+       };
+
+       // Takes ownership of avctx. <write_callback> will be called every time
+       // a write has been made to the video stream (id 0), with the pts of
+       // the just-written frame. (write_callback can be nullptr.)
+       // Does not take ownership of <metrics>; elements in there, if any,
+       // will be added to.
+       Mux(AVFormatContext *avctx, int width, int height, Codec video_codec, const std::string &video_extradata, const AVCodecParameters *audio_codecpar, int time_base, std::function<void(int64_t)> write_callback, WriteStrategy write_strategy, const std::vector<MuxMetrics *> &metrics);
+       ~Mux();
+       void add_packet(const AVPacket &pkt, int64_t pts, int64_t dts, AVRational timebase = { 1, TIMEBASE }, int stream_index_override = -1);
+
+       // As long as the mux is plugged, it will not actually write anything to disk,
+       // just queue the packets. Once it is unplugged, the packets are reordered by pts
+       // and written. This is primarily useful if you might have two different encoders
+       // writing to the mux at the same time (because one is shutting down), so that
+       // pts might otherwise come out-of-order.
+       //
+       // You can plug and unplug multiple times; only when the plug count reaches zero,
+       // something will actually happen.
+       void plug();
+       void unplug();
+
+private:
+       // If write_strategy == WRITE_FOREGORUND, Must be called with <mu> held.
+       void write_packet_or_die(const AVPacket &pkt, int64_t unscaled_pts);
+       void thread_func();
+
+       WriteStrategy write_strategy;
+
+       std::mutex mu;
+
+       // These are only in use if write_strategy == WRITE_BACKGROUND.
+       std::atomic<bool> writer_thread_should_quit{false};
+       std::thread writer_thread;
+
+       AVFormatContext *avctx;  // Protected by <mu>, iff write_strategy == WRITE_BACKGROUND.
+       int plug_count = 0;  // Protected by <mu>.
+
+       // Protected by <mu>. If write_strategy == WRITE_FOREGROUND,
+       // this is only in use when plugging.
+       struct QueuedPacket {
+               AVPacket *pkt;
+               int64_t unscaled_pts;
+       };
+       std::vector<QueuedPacket> packet_queue;
+       std::condition_variable packet_queue_ready;
+
+       AVStream *avstream_video, *avstream_audio;
+
+       std::function<void(int64_t)> write_callback;
+       std::vector<MuxMetrics *> metrics;
+
+       friend struct PacketBefore;
+};
+
+#endif  // !defined(_MUX_H)
diff --git a/timebase.h b/timebase.h
new file mode 100644 (file)
index 0000000..dbc4402
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef _TIMEBASE_H
+#define _TIMEBASE_H 1
+
+// Common timebase that allows us to represent one frame exactly in all the
+// relevant frame rates:
+//
+//   Timebase:                1/120000
+//   Frame at 50fps:       2400/120000
+//   Frame at 60fps:       2000/120000
+//   Frame at 59.94fps:    2002/120000
+//   Frame at 23.976fps:   5005/120000
+//
+// If we also wanted to represent one sample at 48000 Hz, we'd need
+// to go to 300000. Also supporting one sample at 44100 Hz would mean
+// going to 44100000; probably a bit excessive.
+#define TIMEBASE 120000
+
+// Some muxes, like MP4 (or at least avformat's implementation of it),
+// are not too fond of values above 2^31. At timebase 120000, that's only
+// about five hours or so, so we define a coarser timebase that doesn't
+// get 59.94 precisely (so there will be a marginal amount of pts jitter),
+// but can do at least 50 and 60 precisely, and months of streaming.
+#define COARSE_TIMEBASE 300
+
+#endif  // !defined(_TIMEBASE_H)