3 * Copyright (c) 2016 Jan Sebechlebsky
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <stdatomic.h>
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
32 #define FIFO_DEFAULT_QUEUE_SIZE 60
33 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
34 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
36 typedef struct FifoContext {
41 AVDictionary *format_options;
44 AVThreadMessageQueue *queue;
46 pthread_t writer_thread;
48 /* Return value of last write_trailer_call */
49 int write_trailer_ret;
51 /* Time to wait before next recovery attempt
52 * This can refer to the time in processed stream,
54 int64_t recovery_wait_time;
56 /* Maximal number of unsuccessful successive recovery attempts */
57 int max_recovery_attempts;
59 /* Whether to attempt recovery from failure */
62 /* If >0 stream time will be used when waiting
63 * for the recovery attempt instead of real time */
64 int recovery_wait_streamtime;
66 /* If >0 recovery will be attempted regardless of error code
67 * (except AVERROR_EXIT, so exit request is never ignored) */
68 int recover_any_error;
70 /* Whether to drop packets in case the queue is full. */
71 int drop_pkts_on_overflow;
73 /* Whether to wait for keyframe when recovering
74 * from failure or queue overflow */
75 int restart_with_keyframe;
77 pthread_mutex_t overflow_flag_lock;
78 int overflow_flag_lock_initialized;
79 /* Value > 0 signals queue overflow */
80 volatile uint8_t overflow_flag;
82 atomic_int_least64_t queue_duration;
83 int64_t last_sent_dts;
87 typedef struct FifoThreadContext {
90 /* Timestamp of last failure.
91 * This is either pts in case stream time is used,
92 * or microseconds as returned by av_getttime_relative() */
93 int64_t last_recovery_ts;
95 /* Number of current recovery process
96 * Value > 0 means we are in recovery process */
99 /* If > 0 all frames will be dropped until keyframe is received */
100 uint8_t drop_until_keyframe;
102 /* Value > 0 means that the previous write_header call was successful
103 * so finalization by calling write_trailer and ff_io_close must be done
104 * before exiting / reinitialization of underlying muxer */
105 uint8_t header_written;
107 int64_t last_received_dts;
110 typedef enum FifoMessageType {
117 typedef struct FifoMessage {
118 FifoMessageType type;
122 static int fifo_thread_write_header(FifoThreadContext *ctx)
124 AVFormatContext *avf = ctx->avf;
125 FifoContext *fifo = avf->priv_data;
126 AVFormatContext *avf2 = fifo->avf;
127 AVDictionary *format_options = NULL;
130 ret = av_dict_copy(&format_options, fifo->format_options, 0);
134 ret = ff_format_output_open(avf2, avf->url, &format_options);
136 av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
141 for (i = 0;i < avf2->nb_streams; i++)
142 avf2->streams[i]->cur_dts = 0;
144 ret = avformat_write_header(avf2, &format_options);
146 ctx->header_written = 1;
148 // Check for options unrecognized by underlying muxer
149 if (format_options) {
150 AVDictionaryEntry *entry = NULL;
151 while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
152 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
153 ret = AVERROR(EINVAL);
157 av_dict_free(&format_options);
161 static int fifo_thread_flush_output(FifoThreadContext *ctx)
163 AVFormatContext *avf = ctx->avf;
164 FifoContext *fifo = avf->priv_data;
165 AVFormatContext *avf2 = fifo->avf;
167 return av_write_frame(avf2, NULL);
170 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
172 AVStream *st = avf->streams[pkt->stream_index];
173 int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
174 int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
179 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
181 AVFormatContext *avf = ctx->avf;
182 FifoContext *fifo = avf->priv_data;
183 AVFormatContext *avf2 = fifo->avf;
184 AVRational src_tb, dst_tb;
187 if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
188 atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
190 if (ctx->drop_until_keyframe) {
191 if (pkt->flags & AV_PKT_FLAG_KEY) {
192 ctx->drop_until_keyframe = 0;
193 av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
195 av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
196 av_packet_unref(pkt);
201 s_idx = pkt->stream_index;
202 src_tb = avf->streams[s_idx]->time_base;
203 dst_tb = avf2->streams[s_idx]->time_base;
204 av_packet_rescale_ts(pkt, src_tb, dst_tb);
206 ret = av_write_frame(avf2, pkt);
208 av_packet_unref(pkt);
212 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
214 AVFormatContext *avf = ctx->avf;
215 FifoContext *fifo = avf->priv_data;
216 AVFormatContext *avf2 = fifo->avf;
219 if (!ctx->header_written)
222 ret = av_write_trailer(avf2);
223 ff_format_io_close(avf2, &avf2->pb);
228 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
230 int ret = AVERROR(EINVAL);
232 if (msg->type == FIFO_NOOP)
235 if (!ctx->header_written) {
236 ret = fifo_thread_write_header(ctx);
242 case FIFO_WRITE_HEADER:
243 av_assert0(ret >= 0);
245 case FIFO_WRITE_PACKET:
246 return fifo_thread_write_packet(ctx, &msg->pkt);
247 case FIFO_FLUSH_OUTPUT:
248 return fifo_thread_flush_output(ctx);
252 return AVERROR(EINVAL);
255 static int is_recoverable(const FifoContext *fifo, int err_no) {
256 if (!fifo->attempt_recovery)
259 if (fifo->recover_any_error)
260 return err_no != AVERROR_EXIT;
263 case AVERROR(EINVAL):
264 case AVERROR(ENOSYS):
267 case AVERROR_PATCHWELCOME:
274 static void free_message(void *msg)
276 FifoMessage *fifo_msg = msg;
278 if (fifo_msg->type == FIFO_WRITE_PACKET)
279 av_packet_unref(&fifo_msg->pkt);
282 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
285 AVFormatContext *avf = ctx->avf;
286 FifoContext *fifo = avf->priv_data;
289 av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
292 if (fifo->recovery_wait_streamtime) {
293 if (pkt->pts == AV_NOPTS_VALUE)
294 av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
295 " timestamp, recovery will be attempted immediately");
296 ctx->last_recovery_ts = pkt->pts;
298 ctx->last_recovery_ts = av_gettime_relative();
301 if (fifo->max_recovery_attempts &&
302 ctx->recovery_nr >= fifo->max_recovery_attempts) {
303 av_log(avf, AV_LOG_ERROR,
304 "Maximal number of %d recovery attempts reached.\n",
305 fifo->max_recovery_attempts);
308 ret = AVERROR(EAGAIN);
314 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
316 AVFormatContext *avf = ctx->avf;
317 FifoContext *fifo = avf->priv_data;
318 AVPacket *pkt = &msg->pkt;
319 int64_t time_since_recovery;
322 if (!is_recoverable(fifo, err_no)) {
327 if (ctx->header_written) {
328 fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
329 ctx->header_written = 0;
332 if (!ctx->recovery_nr) {
333 ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
336 if (fifo->recovery_wait_streamtime) {
337 if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
338 AVRational tb = avf->streams[pkt->stream_index]->time_base;
339 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
342 /* Enforce recovery immediately */
343 time_since_recovery = fifo->recovery_wait_time;
346 time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
349 if (time_since_recovery < fifo->recovery_wait_time)
350 return AVERROR(EAGAIN);
355 if (fifo->max_recovery_attempts) {
356 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
357 ctx->recovery_nr, fifo->max_recovery_attempts);
359 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
363 if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
364 ctx->drop_until_keyframe = 1;
366 ret = fifo_thread_dispatch_message(ctx, msg);
368 if (is_recoverable(fifo, ret)) {
369 return fifo_thread_process_recovery_failure(ctx, pkt, ret);
374 av_log(avf, AV_LOG_INFO, "Recovery successful\n");
375 ctx->recovery_nr = 0;
385 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
387 AVFormatContext *avf = ctx->avf;
388 FifoContext *fifo = avf->priv_data;
392 if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
393 int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
394 int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
396 av_usleep(FFMIN(10000, time_to_wait));
399 ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
400 } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
402 if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
403 if (msg->type == FIFO_WRITE_PACKET)
404 av_packet_unref(&msg->pkt);
411 static void *fifo_consumer_thread(void *data)
413 AVFormatContext *avf = data;
414 FifoContext *fifo = avf->priv_data;
415 AVThreadMessageQueue *queue = fifo->queue;
416 FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
419 FifoThreadContext fifo_thread_ctx;
420 memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
421 fifo_thread_ctx.avf = avf;
422 fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
425 uint8_t just_flushed = 0;
427 if (!fifo_thread_ctx.recovery_nr)
428 ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
430 if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
431 int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
433 av_thread_message_queue_set_err_send(queue, rec_ret);
438 /* If the queue is full at the moment when fifo_write_packet
439 * attempts to insert new message (packet) to the queue,
440 * it sets the fifo->overflow_flag to 1 and drops packet.
441 * Here in consumer thread, the flag is checked and if it is
442 * set, the queue is flushed and flag cleared. */
443 pthread_mutex_lock(&fifo->overflow_flag_lock);
444 if (fifo->overflow_flag) {
445 av_thread_message_flush(queue);
446 if (fifo->restart_with_keyframe)
447 fifo_thread_ctx.drop_until_keyframe = 1;
448 fifo->overflow_flag = 0;
451 pthread_mutex_unlock(&fifo->overflow_flag_lock);
454 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
457 while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
460 ret = av_thread_message_queue_recv(queue, &msg, 0);
462 av_thread_message_queue_set_err_send(queue, ret);
467 fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
472 static int fifo_mux_init(AVFormatContext *avf, ff_const59 AVOutputFormat *oformat,
473 const char *filename)
475 FifoContext *fifo = avf->priv_data;
476 AVFormatContext *avf2;
479 ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
485 avf2->interrupt_callback = avf->interrupt_callback;
486 avf2->max_delay = avf->max_delay;
487 ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
490 avf2->opaque = avf->opaque;
491 avf2->io_close = avf->io_close;
492 avf2->io_open = avf->io_open;
493 avf2->flags = avf->flags;
495 for (i = 0; i < avf->nb_streams; ++i) {
496 AVStream *st = avformat_new_stream(avf2, NULL);
498 return AVERROR(ENOMEM);
500 ret = ff_stream_encode_params_copy(st, avf->streams[i]);
508 static int fifo_init(AVFormatContext *avf)
510 FifoContext *fifo = avf->priv_data;
511 ff_const59 AVOutputFormat *oformat;
514 if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
515 av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
516 " only when drop_pkts_on_overflow is also turned on\n");
517 return AVERROR(EINVAL);
519 atomic_init(&fifo->queue_duration, 0);
520 fifo->last_sent_dts = AV_NOPTS_VALUE;
522 oformat = av_guess_format(fifo->format, avf->url, NULL);
524 ret = AVERROR_MUXER_NOT_FOUND;
528 ret = fifo_mux_init(avf, oformat, avf->url);
532 ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
533 sizeof(FifoMessage));
537 av_thread_message_queue_set_free_func(fifo->queue, free_message);
539 ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
542 fifo->overflow_flag_lock_initialized = 1;
547 static int fifo_write_header(AVFormatContext *avf)
549 FifoContext * fifo = avf->priv_data;
552 ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
554 av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
555 av_err2str(AVERROR(ret)));
562 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
564 FifoContext *fifo = avf->priv_data;
565 FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
569 ret = av_packet_ref(&msg.pkt,pkt);
574 ret = av_thread_message_queue_send(fifo->queue, &msg,
575 fifo->drop_pkts_on_overflow ?
576 AV_THREAD_MESSAGE_NONBLOCK : 0);
577 if (ret == AVERROR(EAGAIN)) {
578 uint8_t overflow_set = 0;
580 /* Queue is full, set fifo->overflow_flag to 1
581 * to let consumer thread know the queue should
583 pthread_mutex_lock(&fifo->overflow_flag_lock);
584 if (!fifo->overflow_flag)
585 fifo->overflow_flag = overflow_set = 1;
586 pthread_mutex_unlock(&fifo->overflow_flag_lock);
589 av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
592 } else if (ret < 0) {
596 if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
597 atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
602 av_packet_unref(&msg.pkt);
606 static int fifo_write_trailer(AVFormatContext *avf)
608 FifoContext *fifo= avf->priv_data;
611 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
612 if (fifo->timeshift) {
613 int64_t now = av_gettime_relative();
615 FifoMessage msg = {FIFO_NOOP};
617 int64_t delay = av_gettime_relative() - now;
618 if (delay < 0) { // Discontinuity?
620 now = av_gettime_relative();
624 atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
626 if (elapsed > fifo->timeshift)
629 ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
630 } while (ret >= 0 || ret == AVERROR(EAGAIN));
631 atomic_store(&fifo->queue_duration, INT64_MAX);
634 ret = pthread_join(fifo->writer_thread, NULL);
636 av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
637 av_err2str(AVERROR(ret)));
641 ret = fifo->write_trailer_ret;
645 static void fifo_deinit(AVFormatContext *avf)
647 FifoContext *fifo = avf->priv_data;
649 avformat_free_context(fifo->avf);
650 av_thread_message_queue_free(&fifo->queue);
651 if (fifo->overflow_flag_lock_initialized)
652 pthread_mutex_destroy(&fifo->overflow_flag_lock);
655 #define OFFSET(x) offsetof(FifoContext, x)
656 static const AVOption options[] = {
657 {"fifo_format", "Target muxer", OFFSET(format),
658 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
660 {"queue_size", "Size of fifo queue", OFFSET(queue_size),
661 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
663 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
664 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
666 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
667 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
669 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
670 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
672 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
673 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
675 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
676 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
678 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
679 AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
681 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
682 OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
684 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
685 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
687 {"timeshift", "Delay fifo output", OFFSET(timeshift),
688 AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
693 static const AVClass fifo_muxer_class = {
694 .class_name = "Fifo muxer",
695 .item_name = av_default_item_name,
697 .version = LIBAVUTIL_VERSION_INT,
700 AVOutputFormat ff_fifo_muxer = {
702 .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
703 .priv_data_size = sizeof(FifoContext),
705 .write_header = fifo_write_header,
706 .write_packet = fifo_write_packet,
707 .write_trailer = fifo_write_trailer,
708 .deinit = fifo_deinit,
709 .priv_class = &fifo_muxer_class,
710 .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,