X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=libavformat%2Ffifo.c;h=78afaff197d88a373cde7100439c4e9272644279;hb=2086d635c36c2865d1a0145c56e448b30af59ba0;hp=145e2e266047e47802a81e573d8fcd35a5a369ed;hpb=fb4a12cda4033f2f3d3d1039739f6e0e6f9afb82;p=ffmpeg diff --git a/libavformat/fifo.c b/libavformat/fifo.c index 145e2e26604..78afaff197d 100644 --- a/libavformat/fifo.c +++ b/libavformat/fifo.c @@ -19,6 +19,8 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include + #include "libavutil/avassert.h" #include "libavutil/opt.h" #include "libavutil/time.h" @@ -36,7 +38,6 @@ typedef struct FifoContext { AVFormatContext *avf; char *format; - char *format_options_str; AVDictionary *format_options; int queue_size; @@ -78,6 +79,9 @@ typedef struct FifoContext { /* Value > 0 signals queue overflow */ volatile uint8_t overflow_flag; + atomic_int_least64_t queue_duration; + int64_t last_sent_dts; + int64_t timeshift; } FifoContext; typedef struct FifoThreadContext { @@ -99,9 +103,12 @@ typedef struct FifoThreadContext { * so finalization by calling write_trailer and ff_io_close must be done * before exiting / reinitialization of underlying muxer */ uint8_t header_written; + + int64_t last_received_dts; } FifoThreadContext; typedef enum FifoMessageType { + FIFO_NOOP, FIFO_WRITE_HEADER, FIFO_WRITE_PACKET, FIFO_FLUSH_OUTPUT @@ -160,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext *ctx) return av_write_frame(avf2, NULL); } +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts) +{ + AVStream *st = avf->streams[pkt->stream_index]; + int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); + int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); + *last_dts = dts; + return duration; +} + static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) { AVFormatContext *avf = ctx->avf; @@ -168,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) AVRational src_tb, dst_tb; int ret, s_idx; + if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) + atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed); + if (ctx->drop_until_keyframe) { if (pkt->flags & AV_PKT_FLAG_KEY) { ctx->drop_until_keyframe = 0; @@ -210,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg { int ret = AVERROR(EINVAL); + if (msg->type == FIFO_NOOP) + return 0; + if (!ctx->header_written) { ret = fifo_thread_write_header(ctx); if (ret < 0) @@ -391,12 +413,13 @@ static void *fifo_consumer_thread(void *data) AVFormatContext *avf = data; FifoContext *fifo = avf->priv_data; AVThreadMessageQueue *queue = fifo->queue; - FifoMessage msg = {FIFO_WRITE_HEADER, {0}}; + FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; int ret; FifoThreadContext fifo_thread_ctx; memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); fifo_thread_ctx.avf = avf; + fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; while (1) { uint8_t just_flushed = 0; @@ -430,6 +453,10 @@ static void *fifo_consumer_thread(void *data) if (just_flushed) av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); + if (fifo->timeshift) + while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift) + av_usleep(10000); + ret = av_thread_message_queue_recv(queue, &msg, 0); if (ret < 0) { av_thread_message_queue_set_err_send(queue, ret); @@ -442,7 +469,7 @@ static void *fifo_consumer_thread(void *data) return NULL; } -static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat, +static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat, const char *filename) { FifoContext *fifo = avf->priv_data; @@ -481,7 +508,7 @@ static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat, static int fifo_init(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; - AVOutputFormat *oformat; + const AVOutputFormat *oformat; int ret = 0; if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) { @@ -489,16 +516,8 @@ static int fifo_init(AVFormatContext *avf) " only when drop_pkts_on_overflow is also turned on\n"); return AVERROR(EINVAL); } - - if (fifo->format_options_str) { - ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str, - "=", ":", 0); - if (ret < 0) { - av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n", - fifo->format_options_str); - return ret; - } - } + atomic_init(&fifo->queue_duration, 0); + fifo->last_sent_dts = AV_NOPTS_VALUE; oformat = av_guess_format(fifo->format, avf->url, NULL); if (!oformat) { @@ -547,7 +566,6 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) int ret; if (pkt) { - av_init_packet(&msg.pkt); ret = av_packet_ref(&msg.pkt,pkt); if (ret < 0) return ret; @@ -575,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) goto fail; } + if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) + atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed); + return ret; fail: if (pkt) @@ -588,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf) int ret; av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); + if (fifo->timeshift) { + int64_t now = av_gettime_relative(); + int64_t elapsed = 0; + FifoMessage msg = {FIFO_NOOP}; + do { + int64_t delay = av_gettime_relative() - now; + if (delay < 0) { // Discontinuity? + delay = 10000; + now = av_gettime_relative(); + } else { + now += delay; + } + atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed); + elapsed += delay; + if (elapsed > fifo->timeshift) + break; + av_usleep(10000); + ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK); + } while (ret >= 0 || ret == AVERROR(EAGAIN)); + atomic_store(&fifo->queue_duration, INT64_MAX); + } ret = pthread_join(fifo->writer_thread, NULL); if (ret < 0) { @@ -604,7 +646,6 @@ static void fifo_deinit(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; - av_dict_free(&fifo->format_options); avformat_free_context(fifo->avf); av_thread_message_queue_free(&fifo->queue); if (fifo->overflow_flag_lock_initialized) @@ -619,8 +660,8 @@ static const AVOption options[] = { {"queue_size", "Size of fifo queue", OFFSET(queue_size), AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, - {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str), - AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, + {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options), + AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, @@ -643,6 +684,9 @@ static const AVOption options[] = { {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + {"timeshift", "Delay fifo output", OFFSET(timeshift), + AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, + {NULL}, };