From bcd177e1daf5a63d7bf877bc5d30d8803dfd472c Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 15 Feb 2020 17:28:17 +0100 Subject: [PATCH] Make it possible to siphon out a single MJPEG stream. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The URL for this is /feeds/N.mp4 (although the .mp4 suffix will be ignored in practice), where N is the card index (starting from zero). You are allowed to use a feed that is not part of the “regular” multicam MJPEG input. This can be used for remote debugging, recording of a given card with wget, streaming a specific card as an aux, and probably over things. --- futatabi/video_stream.cpp | 5 +- nageru/defs.h | 2 +- nageru/kaeru.cpp | 5 +- nageru/mixer.cpp | 3 +- nageru/mjpeg_encoder.cpp | 150 +++++++++++++++++++++++++++++--------- nageru/mjpeg_encoder.h | 24 ++++-- nageru/video_encoder.cpp | 4 +- shared/httpd.cpp | 49 ++++++++++--- shared/httpd.h | 44 ++++++++--- shared/shared_defs.h | 2 + 10 files changed, 217 insertions(+), 71 deletions(-) diff --git a/futatabi/video_stream.cpp b/futatabi/video_stream.cpp index d7d9fd0..987bef5 100644 --- a/futatabi/video_stream.cpp +++ b/futatabi/video_stream.cpp @@ -797,11 +797,12 @@ int VideoStream::write_packet2(uint8_t *buf, int buf_size, AVIODataMarkerType ty type = AVIO_DATA_MARKER_SYNC_POINT; } + HTTPD::StreamID stream_id{ HTTPD::MAIN_STREAM, 0 }; if (type == AVIO_DATA_MARKER_HEADER) { stream_mux_header.append((char *)buf, buf_size); - global_httpd->set_header(HTTPD::MAIN_STREAM, stream_mux_header); + global_httpd->set_header(stream_id, stream_mux_header); } else { - global_httpd->add_data(HTTPD::MAIN_STREAM, (char *)buf, buf_size, type == AVIO_DATA_MARKER_SYNC_POINT, time, AVRational{ AV_TIME_BASE, 1 }); + global_httpd->add_data(stream_id, (char *)buf, buf_size, type == AVIO_DATA_MARKER_SYNC_POINT, time, AVRational{ AV_TIME_BASE, 1 }); } return buf_size; } diff --git a/nageru/defs.h b/nageru/defs.h index 6d684e7..5113fda 100644 --- a/nageru/defs.h +++ b/nageru/defs.h @@ -5,7 +5,7 @@ #define MAX_FPS 60 #define FAKE_FPS 25 // Must be an integer. -#define MAX_VIDEO_CARDS 16 +// #define MAX_VIDEO_CARDS 16 // defined in shared_defs.h. #define MAX_ALSA_CARDS 16 #define MAX_BUSES 256 // Audio buses. diff --git a/nageru/kaeru.cpp b/nageru/kaeru.cpp index d9b1e0e..cf14bf3 100644 --- a/nageru/kaeru.cpp +++ b/nageru/kaeru.cpp @@ -47,11 +47,12 @@ int write_packet(void *opaque, uint8_t *buf, int buf_size, AVIODataMarkerType ty type = AVIO_DATA_MARKER_SYNC_POINT; } + HTTPD::StreamID stream_id{ HTTPD::MAIN_STREAM, 0 }; if (type == AVIO_DATA_MARKER_HEADER) { stream_mux_header.append((char *)buf, buf_size); - httpd->set_header(HTTPD::MAIN_STREAM, stream_mux_header); + httpd->set_header(stream_id, stream_mux_header); } else { - httpd->add_data(HTTPD::MAIN_STREAM, (char *)buf, buf_size, type == AVIO_DATA_MARKER_SYNC_POINT, time, AVRational{ AV_TIME_BASE, 1 }); + httpd->add_data(stream_id, (char *)buf, buf_size, type == AVIO_DATA_MARKER_SYNC_POINT, time, AVRational{ AV_TIME_BASE, 1 }); } return buf_size; } diff --git a/nageru/mixer.cpp b/nageru/mixer.cpp index aac4211..d213334 100644 --- a/nageru/mixer.cpp +++ b/nageru/mixer.cpp @@ -801,7 +801,8 @@ void Mixer::bm_frame(unsigned card_index, uint16_t timecode, // // Only bother doing MJPEG encoding if there are any connected clients // that want the stream. - if (httpd.get_num_connected_multicam_clients() > 0) { + if (httpd.get_num_connected_multicam_clients() > 0 || + httpd.get_num_connected_siphon_clients(card_index) > 0) { vector converted_samples = convert_audio_to_fixed32(audio_frame.data + audio_offset, num_samples, audio_format, 2); lock_guard lock(card_mutex); if (card->new_raw_audio.empty()) { diff --git a/nageru/mjpeg_encoder.cpp b/nageru/mjpeg_encoder.cpp index a39f1c8..19ffd58 100644 --- a/nageru/mjpeg_encoder.cpp +++ b/nageru/mjpeg_encoder.cpp @@ -1,5 +1,6 @@ #include "mjpeg_encoder.h" +#include #include #include #if __SSE2__ @@ -102,17 +103,18 @@ static_assert(std::is_standard_layout::value, ""); int MJPEGEncoder::write_packet2_thunk(void *opaque, uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time) { - MJPEGEncoder *engine = (MJPEGEncoder *)opaque; - return engine->write_packet2(buf, buf_size, type, time); + WritePacket2Context *ctx = (WritePacket2Context *)opaque; + return ctx->mjpeg_encoder->write_packet2(ctx->stream_id, buf, buf_size, type, time); } -int MJPEGEncoder::write_packet2(uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time) +int MJPEGEncoder::write_packet2(HTTPD::StreamID stream_id, uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time) { + string *mux_header = &streams[stream_id].mux_header; if (type == AVIO_DATA_MARKER_HEADER) { - mux_header.append((char *)buf, buf_size); - httpd->set_header(HTTPD::MULTICAM_STREAM, mux_header); + mux_header->append((char *)buf, buf_size); + httpd->set_header(stream_id, *mux_header); } else { - httpd->add_data(HTTPD::MULTICAM_STREAM, (char *)buf, buf_size, /*keyframe=*/true, AV_NOPTS_VALUE, AVRational{ AV_TIME_BASE, 1 }); + httpd->add_data(stream_id, (char *)buf, buf_size, /*keyframe=*/true, AV_NOPTS_VALUE, AVRational{ AV_TIME_BASE, 1 }); } return buf_size; } @@ -182,24 +184,12 @@ void finalize_mux(AVFormatContext *avctx) MJPEGEncoder::MJPEGEncoder(HTTPD *httpd, const string &va_display) : httpd(httpd) { - // Set up the mux. We don't use the Mux wrapper, because it's geared towards - // a situation with only one video stream (and possibly one audio stream) - // with known width/height, and we don't need the extra functionality it provides. - avctx.reset(avformat_alloc_context()); - avctx->oformat = av_guess_format("nut", nullptr, nullptr); - - uint8_t *buf = (uint8_t *)av_malloc(MUX_BUFFER_SIZE); - avctx->pb = avio_alloc_context(buf, MUX_BUFFER_SIZE, 1, this, nullptr, nullptr, nullptr); - avctx->pb->write_data_type = &MJPEGEncoder::write_packet2_thunk; - avctx->flags = AVFMT_FLAG_CUSTOM_IO; - - for (unsigned card_idx = 0; card_idx < global_flags.card_to_mjpeg_stream_export.size(); ++card_idx) { - add_video_stream(avctx.get()); + create_ffmpeg_context(HTTPD::StreamID{ HTTPD::MULTICAM_STREAM, 0 }); + for (unsigned stream_idx = 0; stream_idx < MAX_VIDEO_CARDS; ++stream_idx) { + create_ffmpeg_context(HTTPD::StreamID{ HTTPD::SIPHON_STREAM, stream_idx }); } - for (unsigned card_idx = 0; card_idx < global_flags.card_to_mjpeg_stream_export.size(); ++card_idx) { - add_audio_stream(avctx.get()); - } - finalize_mux(avctx.get()); + + add_stream(HTTPD::StreamID{ HTTPD::MULTICAM_STREAM, 0 }); // Initialize VA-API. string error; @@ -225,7 +215,9 @@ MJPEGEncoder::MJPEGEncoder(HTTPD *httpd, const string &va_display) MJPEGEncoder::~MJPEGEncoder() { - av_free(avctx->pb->buffer); + for (auto &id_and_stream : streams) { + av_free(id_and_stream.second.avctx->pb->buffer); + } global_metrics.remove("mjpeg_frames", {{ "status", "dropped" }, { "reason", "zero_size" }}); global_metrics.remove("mjpeg_frames", {{ "status", "dropped" }, { "reason", "interlaced" }}); @@ -347,7 +339,8 @@ bool MJPEGEncoder::should_encode_mjpeg_for_card(unsigned card_index) { // Only bother doing MJPEG encoding if there are any connected clients // that want the stream. - if (httpd->get_num_connected_multicam_clients() == 0) { + if (httpd->get_num_connected_multicam_clients() == 0 && + httpd->get_num_connected_siphon_clients(card_index) == 0) { return false; } @@ -380,14 +373,26 @@ void MJPEGEncoder::encoder_thread_func() // Will call back in the receiver thread. encode_jpeg_va(move(qf)); } else { + update_siphon_streams(); + + HTTPD::StreamID multicam_id{ HTTPD::MULTICAM_STREAM, 0 }; + HTTPD::StreamID siphon_id{ HTTPD::SIPHON_STREAM, qf.card_index }; + assert(streams.count(multicam_id)); + // Write audio before video, since Futatabi expects it. if (qf.audio.size() > 0) { - write_audio_packet(qf.pts, stream_index, qf.audio); + write_audio_packet(streams[multicam_id].avctx.get(), qf.pts, stream_index + global_flags.card_to_mjpeg_stream_export.size(), qf.audio); + if (streams.count(siphon_id)) { + write_audio_packet(streams[siphon_id].avctx.get(), qf.pts, /*stream_index=*/1, qf.audio); + } } // Encode synchronously, in the same thread. vector jpeg = encode_jpeg_libjpeg(qf); - write_mjpeg_packet(qf.pts, stream_index, jpeg.data(), jpeg.size()); + write_mjpeg_packet(streams[multicam_id].avctx.get(), qf.pts, stream_index, jpeg.data(), jpeg.size()); + if (streams.count(siphon_id)) { + write_mjpeg_packet(streams[siphon_id].avctx.get(), qf.pts, /*stream_index=*/0, jpeg.data(), jpeg.size()); + } } } @@ -397,40 +402,40 @@ void MJPEGEncoder::encoder_thread_func() free(tmp_cr); } -void MJPEGEncoder::write_mjpeg_packet(int64_t pts, unsigned card_index, const uint8_t *jpeg, size_t jpeg_size) +void MJPEGEncoder::write_mjpeg_packet(AVFormatContext *avctx, int64_t pts, unsigned stream_index, const uint8_t *jpeg, size_t jpeg_size) { AVPacket pkt; memset(&pkt, 0, sizeof(pkt)); pkt.buf = nullptr; pkt.data = const_cast(jpeg); pkt.size = jpeg_size; - pkt.stream_index = card_index; + pkt.stream_index = stream_index; pkt.flags = AV_PKT_FLAG_KEY; AVRational time_base = avctx->streams[pkt.stream_index]->time_base; pkt.pts = pkt.dts = av_rescale_q(pts, AVRational{ 1, TIMEBASE }, time_base); pkt.duration = 0; - if (av_write_frame(avctx.get(), &pkt) < 0) { + if (av_write_frame(avctx, &pkt) < 0) { fprintf(stderr, "av_write_frame() failed\n"); abort(); } } -void MJPEGEncoder::write_audio_packet(int64_t pts, unsigned card_index, const vector &audio) +void MJPEGEncoder::write_audio_packet(AVFormatContext *avctx, int64_t pts, unsigned stream_index, const vector &audio) { AVPacket pkt; memset(&pkt, 0, sizeof(pkt)); pkt.buf = nullptr; pkt.data = reinterpret_cast(const_cast(&audio[0])); pkt.size = audio.size() * sizeof(audio[0]); - pkt.stream_index = card_index + global_flags.card_to_mjpeg_stream_export.size(); + pkt.stream_index = stream_index; pkt.flags = AV_PKT_FLAG_KEY; AVRational time_base = avctx->streams[pkt.stream_index]->time_base; pkt.pts = pkt.dts = av_rescale_q(pts, AVRational{ 1, TIMEBASE }, time_base); size_t num_stereo_samples = audio.size() / 2; pkt.duration = av_rescale_q(num_stereo_samples, AVRational{ 1, OUTPUT_FREQUENCY }, time_base); - if (av_write_frame(avctx.get(), &pkt) < 0) { + if (av_write_frame(avctx, &pkt) < 0) { fprintf(stderr, "av_write_frame() failed\n"); abort(); } @@ -890,12 +895,22 @@ void MJPEGEncoder::va_receiver_thread_func() frames_encoding.pop(); } + update_siphon_streams(); + assert(global_flags.card_to_mjpeg_stream_export.count(qf.card_index)); // Or should_encode_mjpeg_for_card() would have returned false. int stream_index = global_flags.card_to_mjpeg_stream_export[qf.card_index]; + HTTPD::StreamID multicam_id{ HTTPD::MULTICAM_STREAM, 0 }; + HTTPD::StreamID siphon_id{ HTTPD::SIPHON_STREAM, qf.card_index }; + assert(streams.count(multicam_id)); + assert(streams[multicam_id].avctx != nullptr); + // Write audio before video, since Futatabi expects it. if (qf.audio.size() > 0) { - write_audio_packet(qf.pts, stream_index, qf.audio); + write_audio_packet(streams[multicam_id].avctx.get(), qf.pts, stream_index + global_flags.card_to_mjpeg_stream_export.size(), qf.audio); + if (streams.count(siphon_id)) { + write_audio_packet(streams[siphon_id].avctx.get(), qf.pts, /*stream_index=*/1, qf.audio); + } } VAStatus va_status = vaSyncSurface(va_dpy->va_dpy, qf.resources.surface); @@ -906,7 +921,10 @@ void MJPEGEncoder::va_receiver_thread_func() CHECK_VASTATUS(va_status, "vaMapBuffer"); const uint8_t *coded_buf = reinterpret_cast(segment->buf); - write_mjpeg_packet(qf.pts, stream_index, coded_buf, segment->size); + write_mjpeg_packet(streams[multicam_id].avctx.get(), qf.pts, stream_index, coded_buf, segment->size); + if (streams.count(siphon_id)) { + write_mjpeg_packet(streams[siphon_id].avctx.get(), qf.pts, /*stream_index=*/0, coded_buf, segment->size); + } va_status = vaUnmapBuffer(va_dpy->va_dpy, qf.resources.data_buffer); CHECK_VASTATUS(va_status, "vaUnmapBuffer"); @@ -943,3 +961,65 @@ vector MJPEGEncoder::encode_jpeg_libjpeg(const QueuedFrame &qf) return dest.dest; } + +void MJPEGEncoder::add_stream(HTTPD::StreamID stream_id) +{ + AVFormatContextWithCloser avctx; + + // Set up the mux. We don't use the Mux wrapper, because it's geared towards + // a situation with only one video stream (and possibly one audio stream) + // with known width/height, and we don't need the extra functionality it provides. + avctx.reset(avformat_alloc_context()); + avctx->oformat = av_guess_format("nut", nullptr, nullptr); + + uint8_t *buf = (uint8_t *)av_malloc(MUX_BUFFER_SIZE); + avctx->pb = avio_alloc_context(buf, MUX_BUFFER_SIZE, 1, &ffmpeg_contexts[stream_id], nullptr, nullptr, nullptr); + avctx->pb->write_data_type = &MJPEGEncoder::write_packet2_thunk; + avctx->flags = AVFMT_FLAG_CUSTOM_IO; + + if (stream_id.type == HTTPD::MULTICAM_STREAM) { + for (unsigned card_idx = 0; card_idx < global_flags.card_to_mjpeg_stream_export.size(); ++card_idx) { + add_video_stream(avctx.get()); + } + for (unsigned card_idx = 0; card_idx < global_flags.card_to_mjpeg_stream_export.size(); ++card_idx) { + add_audio_stream(avctx.get()); + } + } else { + assert(stream_id.type == HTTPD::SIPHON_STREAM); + add_video_stream(avctx.get()); + add_audio_stream(avctx.get()); + } + finalize_mux(avctx.get()); + + Stream s; + s.avctx = move(avctx); + streams[stream_id] = move(s); +} + +void MJPEGEncoder::update_siphon_streams() +{ + // Bring the list of streams into sync with what the clients need. + for (auto it = streams.begin(); it != streams.end(); ) { + if (it->first.type != HTTPD::SIPHON_STREAM) { + ++it; + continue; + } + if (httpd->get_num_connected_siphon_clients(it->first.index) == 0) { + av_free(it->second.avctx->pb->buffer); + streams.erase(it++); + } else { + ++it; + } + } + for (unsigned stream_idx = 0; stream_idx < MAX_VIDEO_CARDS; ++stream_idx) { + HTTPD::StreamID stream_id{ HTTPD::SIPHON_STREAM, stream_idx }; + if (streams.count(stream_id) == 0 && httpd->get_num_connected_siphon_clients(stream_idx) > 0) { + add_stream(stream_id); + } + } +} + +void MJPEGEncoder::create_ffmpeg_context(HTTPD::StreamID stream_id) +{ + ffmpeg_contexts.emplace(stream_id, WritePacket2Context{ this, stream_id }); +} diff --git a/nageru/mjpeg_encoder.h b/nageru/mjpeg_encoder.h index 362e934..6ba7f03 100644 --- a/nageru/mjpeg_encoder.h +++ b/nageru/mjpeg_encoder.h @@ -1,7 +1,9 @@ #ifndef _MJPEG_ENCODER_H #define _MJPEG_ENCODER_H 1 +#include "defs.h" #include "shared/ffmpeg_raii.h" +#include "shared/httpd.h" #include "ref_counted_frame.h" extern "C" { @@ -23,7 +25,6 @@ extern "C" { #include #include -class HTTPD; struct jpeg_compress_struct; struct VADisplayWithCleanup; struct VectorDestinationManager; @@ -116,13 +117,21 @@ private: void va_receiver_thread_func(); void encode_jpeg_va(QueuedFrame &&qf); std::vector encode_jpeg_libjpeg(const QueuedFrame &qf); - void write_mjpeg_packet(int64_t pts, unsigned card_index, const uint8_t *jpeg, size_t jpeg_size); - void write_audio_packet(int64_t pts, unsigned card_index, const std::vector &audio); + void write_mjpeg_packet(AVFormatContext *avctx, int64_t pts, unsigned stream_index, const uint8_t *jpeg, size_t jpeg_size); + void write_audio_packet(AVFormatContext *avctx, int64_t pts, unsigned stream_index, const std::vector &audio); void init_jpeg_422(unsigned width, unsigned height, const movit::RGBTriplet &white_balance, VectorDestinationManager *dest, jpeg_compress_struct *cinfo); std::vector get_jpeg_header(unsigned width, unsigned height, const movit::RGBTriplet &white_balance, jpeg_compress_struct *cinfo); + void add_stream(HTTPD::StreamID stream_id); // Can only be called from the constructor, or the thread owning . + void update_siphon_streams(); // Same. + void create_ffmpeg_context(HTTPD::StreamID stream_id); + struct WritePacket2Context { + MJPEGEncoder *mjpeg_encoder; + HTTPD::StreamID stream_id; + }; + std::map ffmpeg_contexts; // Statically set up, so we never need to worry about dangling pointers. static int write_packet2_thunk(void *opaque, uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time); - int write_packet2(uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time); + int write_packet2(HTTPD::StreamID stream_id, uint8_t *buf, int buf_size, AVIODataMarkerType type, int64_t time); std::thread encoder_thread, va_receiver_thread; @@ -133,9 +142,12 @@ private: std::queue frames_encoding; // Under mu. Used for VA-API only. std::condition_variable any_frames_encoding; - AVFormatContextWithCloser avctx; + struct Stream { + AVFormatContextWithCloser avctx; + std::string mux_header; + }; + std::map streams; // Owned by the VA-API receiver thread if VA-API is active, or the encoder thread if not. HTTPD *httpd; - std::string mux_header; std::atomic should_quit{false}; bool running = false; diff --git a/nageru/video_encoder.cpp b/nageru/video_encoder.cpp index 7e4f09f..2ac606e 100644 --- a/nageru/video_encoder.cpp +++ b/nageru/video_encoder.cpp @@ -217,9 +217,9 @@ int VideoEncoder::write_packet2(uint8_t *buf, int buf_size, AVIODataMarkerType t if (type == AVIO_DATA_MARKER_HEADER) { stream_mux_header.append((char *)buf, buf_size); - httpd->set_header(HTTPD::MAIN_STREAM, stream_mux_header); + httpd->set_header(HTTPD::StreamID{ HTTPD::MAIN_STREAM, 0 }, stream_mux_header); } else { - httpd->add_data(HTTPD::MAIN_STREAM, (char *)buf, buf_size, type == AVIO_DATA_MARKER_SYNC_POINT, time, AVRational{ AV_TIME_BASE, 1 }); + httpd->add_data(HTTPD::StreamID{ HTTPD::MAIN_STREAM, 0 }, (char *)buf, buf_size, type == AVIO_DATA_MARKER_SYNC_POINT, time, AVRational{ AV_TIME_BASE, 1 }); } return buf_size; } diff --git a/shared/httpd.cpp b/shared/httpd.cpp index f447f54..a881919 100644 --- a/shared/httpd.cpp +++ b/shared/httpd.cpp @@ -27,6 +27,11 @@ HTTPD::HTTPD() { global_metrics.add("num_connected_clients", &metric_num_connected_clients, Metrics::TYPE_GAUGE); global_metrics.add("num_connected_multicam_clients", &metric_num_connected_multicam_clients, Metrics::TYPE_GAUGE); + for (unsigned stream_idx = 0; stream_idx < MAX_VIDEO_CARDS; ++stream_idx) { + global_metrics.add("num_connected_siphon_clients", + {{ "card", to_string(stream_idx) }}, + &metric_num_connected_siphon_clients[stream_idx], Metrics::TYPE_GAUGE); + } } HTTPD::~HTTPD() @@ -59,12 +64,24 @@ void HTTPD::stop() } } -void HTTPD::add_data(StreamType stream_type, const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase) +void HTTPD::set_header(StreamID stream_id, const string &data) +{ + lock_guard lock(streams_mutex); + header[stream_id] = data; + add_data_locked(stream_id, data.data(), data.size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 }); +} + +void HTTPD::add_data(StreamID stream_id, const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase) { lock_guard lock(streams_mutex); + add_data_locked(stream_id, buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER, time, timebase); +} + +void HTTPD::add_data_locked(StreamID stream_id, const char *buf, size_t size, Stream::DataType data_type, int64_t time, AVRational timebase) +{ for (Stream *stream : streams) { - if (stream->get_stream_type() == stream_type) { - stream->add_data(buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER, time, timebase); + if (stream->get_stream_id() == stream_id) { + stream->add_data(buf, size, data_type, time, timebase); } } } @@ -90,11 +107,16 @@ int HTTPD::answer_to_connection(MHD_Connection *connection, } else { framing = HTTPD::Stream::FRAMING_RAW; } - HTTPD::StreamType stream_type; + HTTPD::StreamID stream_id; if (strcmp(url, "/multicam.mp4") == 0) { - stream_type = HTTPD::StreamType::MULTICAM_STREAM; + stream_id.type = HTTPD::StreamType::MULTICAM_STREAM; + stream_id.index = 0; + } else if (strncmp(url, "/feeds/", 7) == 0) { + stream_id.type = HTTPD::StreamType::SIPHON_STREAM; + stream_id.index = atoi(url + 7); } else { - stream_type = HTTPD::StreamType::MAIN_STREAM; + stream_id.type = HTTPD::StreamType::MAIN_STREAM; + stream_id.index = 0; } if (strcmp(url, "/metrics") == 0) { @@ -130,16 +152,20 @@ int HTTPD::answer_to_connection(MHD_Connection *connection, return ret; } - HTTPD::Stream *stream = new HTTPD::Stream(this, framing, stream_type); - stream->add_data(header[stream_type].data(), header[stream_type].size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 }); + HTTPD::Stream *stream = new HTTPD::Stream(this, framing, stream_id); + const string &hdr = header[stream_id]; + stream->add_data(hdr.data(), hdr.size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 }); { lock_guard lock(streams_mutex); streams.insert(stream); } ++metric_num_connected_clients; - if (stream_type == HTTPD::StreamType::MULTICAM_STREAM) { + if (stream_id.type == HTTPD::StreamType::MULTICAM_STREAM) { ++metric_num_connected_multicam_clients; } + if (stream_id.type == HTTPD::StreamType::SIPHON_STREAM) { + ++metric_num_connected_siphon_clients[stream_id.index]; + } *con_cls = stream; // Does not strictly have to be equal to MUX_BUFFER_SIZE. @@ -160,9 +186,12 @@ void HTTPD::free_stream(void *cls) { HTTPD::Stream *stream = (HTTPD::Stream *)cls; HTTPD *httpd = stream->get_parent(); - if (stream->get_stream_type() == HTTPD::StreamType::MULTICAM_STREAM) { + if (stream->get_stream_id().type == HTTPD::StreamType::MULTICAM_STREAM) { --httpd->metric_num_connected_multicam_clients; } + if (stream->get_stream_id().type == HTTPD::StreamType::SIPHON_STREAM) { + --httpd->metric_num_connected_siphon_clients[stream->get_stream_id().index]; + } { lock_guard lock(httpd->streams_mutex); delete stream; diff --git a/shared/httpd.h b/shared/httpd.h index 6c9a254..dae5cad 100644 --- a/shared/httpd.h +++ b/shared/httpd.h @@ -3,6 +3,7 @@ // A class dealing with stream output to HTTP. +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include #include @@ -20,6 +22,8 @@ extern "C" { #include } +#include "shared/shared_defs.h" + struct MHD_Connection; struct MHD_Daemon; @@ -34,13 +38,21 @@ public: enum StreamType { MAIN_STREAM, MULTICAM_STREAM, - NUM_STREAM_TYPES + SIPHON_STREAM // The only one that can have stream_index != 0. + }; + struct StreamID { + StreamType type; + unsigned index; + + bool operator< (const StreamID &other) const { + if (type != other.type) + return type < other.type; + return index < other.index; + } + bool operator== (const StreamID &other) const { + return (type == other.type && index == other.index); + } }; - - // Should be called before start(). - void set_header(StreamType stream_type, const std::string &data) { - header[stream_type] = data; - } // Should be called before start() (due to threading issues). enum CORSPolicy { @@ -54,7 +66,8 @@ public: void start(int port); void stop(); - void add_data(StreamType stream_type, const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase); + void set_header(StreamID stream_id, const std::string &data); + void add_data(StreamID stream_id, const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase); int64_t get_num_connected_clients() const { return metric_num_connected_clients.load(); @@ -62,6 +75,10 @@ public: int64_t get_num_connected_multicam_clients() const { return metric_num_connected_multicam_clients.load(); } + int64_t get_num_connected_siphon_clients(unsigned stream_idx) const { + assert(stream_idx < MAX_VIDEO_CARDS); + return metric_num_connected_siphon_clients[stream_idx].load(); + } private: static int answer_to_connection_thunk(void *cls, MHD_Connection *connection, @@ -82,8 +99,8 @@ private: FRAMING_RAW, FRAMING_METACUBE }; - Stream(HTTPD *parent, Framing framing, StreamType stream_type) - : parent(parent), framing(framing), stream_type(stream_type) {} + Stream(HTTPD *parent, Framing framing, StreamID stream_id) + : parent(parent), framing(framing), stream_id(stream_id) {} static ssize_t reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max); ssize_t reader_callback(uint64_t pos, char *buf, size_t max); @@ -96,7 +113,7 @@ private: void add_data(const char *buf, size_t size, DataType data_type, int64_t time, AVRational timebase); void stop(); HTTPD *get_parent() const { return parent; } - StreamType get_stream_type() const { return stream_type; } + StreamID get_stream_id() const { return stream_id; } private: HTTPD *parent; @@ -109,9 +126,11 @@ private: size_t used_of_buffered_data = 0; // How many bytes of the first element of that is already used. Protected by . size_t buffered_data_bytes = 0; // The sum of all size() in buffered_data. Protected by . size_t seen_keyframe = false; - StreamType stream_type; + StreamID stream_id; }; + void add_data_locked(StreamID stream_id, const char *buf, size_t size, Stream::DataType data_type, int64_t time, AVRational timebase); + MHD_Daemon *mhd = nullptr; std::mutex streams_mutex; std::set streams; // Not owned. @@ -120,11 +139,12 @@ private: CORSPolicy cors_policy; }; std::unordered_map endpoints; - std::string header[NUM_STREAM_TYPES]; + std::map header; // Metrics. std::atomic metric_num_connected_clients{0}; std::atomic metric_num_connected_multicam_clients{0}; + std::atomic metric_num_connected_siphon_clients[MAX_VIDEO_CARDS] {{0}}; }; #endif // !defined(_HTTPD_H) diff --git a/shared/shared_defs.h b/shared/shared_defs.h index 62b719d..1f1cf69 100644 --- a/shared/shared_defs.h +++ b/shared/shared_defs.h @@ -20,4 +20,6 @@ // the output to be very uneven. #define MUX_BUFFER_SIZE 10485760 +#define MAX_VIDEO_CARDS 16 // Only really used by Nageru. + #endif // !defined(_SHARED_DEFS_H) -- 2.39.2