* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#include <stdatomic.h>
+
#include "libavutil/avassert.h"
#include "libavutil/opt.h"
#include "libavutil/time.h"
AVFormatContext *avf;
char *format;
- char *format_options_str;
AVDictionary *format_options;
int queue_size;
/* Value > 0 signals queue overflow */
volatile uint8_t overflow_flag;
+ atomic_int_least64_t queue_duration;
+ int64_t last_sent_dts;
+ int64_t timeshift;
} FifoContext;
typedef struct FifoThreadContext {
* so finalization by calling write_trailer and ff_io_close must be done
* before exiting / reinitialization of underlying muxer */
uint8_t header_written;
+
+ int64_t last_received_dts;
} FifoThreadContext;
typedef enum FifoMessageType {
+ FIFO_NOOP,
FIFO_WRITE_HEADER,
FIFO_WRITE_PACKET,
FIFO_FLUSH_OUTPUT
return av_write_frame(avf2, NULL);
}
+static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
+{
+ AVStream *st = avf->streams[pkt->stream_index];
+ int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
+ int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
+ *last_dts = dts;
+ return duration;
+}
+
static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
{
AVFormatContext *avf = ctx->avf;
AVRational src_tb, dst_tb;
int ret, s_idx;
+ if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
+ atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
+
if (ctx->drop_until_keyframe) {
if (pkt->flags & AV_PKT_FLAG_KEY) {
ctx->drop_until_keyframe = 0;
{
int ret = AVERROR(EINVAL);
+ if (msg->type == FIFO_NOOP)
+ return 0;
+
if (!ctx->header_written) {
ret = fifo_thread_write_header(ctx);
if (ret < 0)
AVFormatContext *avf = data;
FifoContext *fifo = avf->priv_data;
AVThreadMessageQueue *queue = fifo->queue;
- FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
+ FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
int ret;
FifoThreadContext fifo_thread_ctx;
memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
fifo_thread_ctx.avf = avf;
+ fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
while (1) {
uint8_t just_flushed = 0;
if (just_flushed)
av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
+ if (fifo->timeshift)
+ while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
+ av_usleep(10000);
+
ret = av_thread_message_queue_recv(queue, &msg, 0);
if (ret < 0) {
av_thread_message_queue_set_err_send(queue, ret);
return NULL;
}
-static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat,
+static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
const char *filename)
{
FifoContext *fifo = avf->priv_data;
static int fifo_init(AVFormatContext *avf)
{
FifoContext *fifo = avf->priv_data;
- AVOutputFormat *oformat;
+ const AVOutputFormat *oformat;
int ret = 0;
if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
" only when drop_pkts_on_overflow is also turned on\n");
return AVERROR(EINVAL);
}
-
- if (fifo->format_options_str) {
- ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
- "=", ":", 0);
- if (ret < 0) {
- av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
- fifo->format_options_str);
- return ret;
- }
- }
+ atomic_init(&fifo->queue_duration, 0);
+ fifo->last_sent_dts = AV_NOPTS_VALUE;
oformat = av_guess_format(fifo->format, avf->url, NULL);
if (!oformat) {
int ret;
if (pkt) {
- av_init_packet(&msg.pkt);
ret = av_packet_ref(&msg.pkt,pkt);
if (ret < 0)
return ret;
goto fail;
}
+ if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
+ atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
+
return ret;
fail:
if (pkt)
int ret;
av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+ if (fifo->timeshift) {
+ int64_t now = av_gettime_relative();
+ int64_t elapsed = 0;
+ FifoMessage msg = {FIFO_NOOP};
+ do {
+ int64_t delay = av_gettime_relative() - now;
+ if (delay < 0) { // Discontinuity?
+ delay = 10000;
+ now = av_gettime_relative();
+ } else {
+ now += delay;
+ }
+ atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
+ elapsed += delay;
+ if (elapsed > fifo->timeshift)
+ break;
+ av_usleep(10000);
+ ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
+ } while (ret >= 0 || ret == AVERROR(EAGAIN));
+ atomic_store(&fifo->queue_duration, INT64_MAX);
+ }
ret = pthread_join(fifo->writer_thread, NULL);
if (ret < 0) {
{
FifoContext *fifo = avf->priv_data;
- av_dict_free(&fifo->format_options);
avformat_free_context(fifo->avf);
av_thread_message_queue_free(&fifo->queue);
if (fifo->overflow_flag_lock_initialized)
{"queue_size", "Size of fifo queue", OFFSET(queue_size),
AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
- {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
- AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+ {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
+ AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
{"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
{"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+ {"timeshift", "Delay fifo output", OFFSET(timeshift),
+ AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
{NULL},
};
.version = LIBAVUTIL_VERSION_INT,
};
-AVOutputFormat ff_fifo_muxer = {
+const AVOutputFormat ff_fifo_muxer = {
.name = "fifo",
.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
.priv_data_size = sizeof(FifoContext),