X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=libavcodec%2Fframe_thread_encoder.c;h=778317d60bbb479b238a24b6b06b98759e0eabf8;hb=e625ae609206e0550ff733965c6f5447579320aa;hp=949bc69f81d216530abb2de82cf76ec42e15f5c1;hpb=e82a619c2a154ae6e3e3a81af55977bd5a46660e;p=ffmpeg diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c index 949bc69f81d..778317d60bb 100644 --- a/libavcodec/frame_thread_encoder.c +++ b/libavcodec/frame_thread_encoder.c @@ -22,7 +22,6 @@ #include "frame_thread_encoder.h" -#include "libavutil/fifo.h" #include "libavutil/avassert.h" #include "libavutil/imgutils.h" #include "libavutil/opt.h" @@ -32,27 +31,32 @@ #include "thread.h" #define MAX_THREADS 64 -#define BUFFER_SIZE (2*MAX_THREADS) +/* There can be as many as MAX_THREADS + 1 outstanding tasks. + * An additional + 1 is needed so that one can distinguish + * the case of zero and MAX_THREADS + 1 outstanding tasks modulo + * the number of buffers. */ +#define BUFFER_SIZE (MAX_THREADS + 2) typedef struct{ - void *indata; - void *outdata; - int64_t return_code; - unsigned index; + AVFrame *indata; + AVPacket *outdata; + int return_code; + int finished; } Task; typedef struct{ AVCodecContext *parent_avctx; pthread_mutex_t buffer_mutex; - AVFifoBuffer *task_fifo; - pthread_mutex_t task_fifo_mutex; + pthread_mutex_t task_fifo_mutex; /* Used to guard (next_)task_index */ pthread_cond_t task_fifo_cond; - Task finished_tasks[BUFFER_SIZE]; - pthread_mutex_t finished_task_mutex; + unsigned max_tasks; + Task tasks[BUFFER_SIZE]; + pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */ pthread_cond_t finished_task_cond; + unsigned next_task_index; unsigned task_index; unsigned finished_task_index; @@ -63,50 +67,53 @@ typedef struct{ static void * attribute_align_arg worker(void *v){ AVCodecContext *avctx = v; ThreadContext *c = avctx->internal->frame_thread_encoder; - AVPacket *pkt = NULL; while (!atomic_load(&c->exit)) { - int got_packet, ret; + int got_packet = 0, ret; + AVPacket *pkt; AVFrame *frame; - Task task; - - if(!pkt) pkt = av_packet_alloc(); - if(!pkt) continue; - av_init_packet(pkt); + Task *task; + unsigned task_index; pthread_mutex_lock(&c->task_fifo_mutex); - while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) { + while (c->next_task_index == c->task_index || atomic_load(&c->exit)) { if (atomic_load(&c->exit)) { pthread_mutex_unlock(&c->task_fifo_mutex); goto end; } pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex); } - av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL); + task_index = c->next_task_index; + c->next_task_index = (c->next_task_index + 1) % c->max_tasks; pthread_mutex_unlock(&c->task_fifo_mutex); - frame = task.indata; - - ret = avcodec_encode_video2(avctx, pkt, frame, &got_packet); - pthread_mutex_lock(&c->buffer_mutex); - av_frame_unref(frame); - pthread_mutex_unlock(&c->buffer_mutex); - av_frame_free(&frame); + /* The main thread ensures that any two outstanding tasks have + * different indices, ergo each worker thread owns its element + * of c->tasks with the exception of finished, which is shared + * with the main thread and guarded by finished_task_mutex. */ + task = &c->tasks[task_index]; + frame = task->indata; + pkt = task->outdata; + + ret = avctx->codec->encode2(avctx, pkt, frame, &got_packet); if(got_packet) { int ret2 = av_packet_make_refcounted(pkt); if (ret >= 0 && ret2 < 0) ret = ret2; + pkt->pts = pkt->dts = frame->pts; } else { pkt->data = NULL; pkt->size = 0; } + pthread_mutex_lock(&c->buffer_mutex); + av_frame_unref(frame); + pthread_mutex_unlock(&c->buffer_mutex); pthread_mutex_lock(&c->finished_task_mutex); - c->finished_tasks[task.index].outdata = pkt; pkt = NULL; - c->finished_tasks[task.index].return_code = ret; + task->return_code = ret; + task->finished = 1; pthread_cond_signal(&c->finished_task_cond); pthread_mutex_unlock(&c->finished_task_mutex); } end: - av_free(pkt); pthread_mutex_lock(&c->buffer_mutex); avcodec_close(avctx); pthread_mutex_unlock(&c->buffer_mutex); @@ -120,7 +127,7 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){ if( !(avctx->thread_type & FF_THREAD_FRAME) - || !(avctx->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY)) + || !(avctx->codec->capabilities & AV_CODEC_CAP_FRAME_THREADS)) return 0; if( !avctx->thread_count @@ -180,10 +187,6 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){ c->parent_avctx = avctx; - c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task)); - if(!c->task_fifo) - goto fail; - pthread_mutex_init(&c->task_fifo_mutex, NULL); pthread_mutex_init(&c->finished_task_mutex, NULL); pthread_mutex_init(&c->buffer_mutex, NULL); @@ -191,6 +194,13 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){ pthread_cond_init(&c->finished_task_cond, NULL); atomic_init(&c->exit, 0); + c->max_tasks = avctx->thread_count + 2; + for (unsigned i = 0; i < c->max_tasks; i++) { + if (!(c->tasks[i].indata = av_frame_alloc()) || + !(c->tasks[i].outdata = av_packet_alloc())) + goto fail; + } + for(i=0; ithread_count ; i++){ AVDictionary *tmp = NULL; int ret; @@ -252,21 +262,9 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){ pthread_join(c->worker[i], NULL); } - while (av_fifo_size(c->task_fifo) > 0) { - Task task; - AVFrame *frame; - av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL); - frame = task.indata; - av_frame_free(&frame); - task.indata = NULL; - } - - for (i=0; ifinished_tasks[i].outdata != NULL) { - AVPacket *pkt = c->finished_tasks[i].outdata; - av_packet_free(&pkt); - c->finished_tasks[i].outdata = NULL; - } + for (unsigned i = 0; i < c->max_tasks; i++) { + av_frame_free(&c->tasks[i].indata); + av_packet_free(&c->tasks[i].outdata); } pthread_mutex_destroy(&c->task_fifo_mutex); @@ -274,55 +272,47 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){ pthread_mutex_destroy(&c->buffer_mutex); pthread_cond_destroy(&c->task_fifo_cond); pthread_cond_destroy(&c->finished_task_cond); - av_fifo_freep(&c->task_fifo); av_freep(&avctx->internal->frame_thread_encoder); } -int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){ +int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, + AVFrame *frame, int *got_packet_ptr) +{ ThreadContext *c = avctx->internal->frame_thread_encoder; - Task task; - int ret; + Task *outtask; av_assert1(!*got_packet_ptr); if(frame){ - AVFrame *new = av_frame_alloc(); - if(!new) - return AVERROR(ENOMEM); - ret = av_frame_ref(new, frame); - if(ret < 0) { - av_frame_free(&new); - return ret; - } + av_frame_move_ref(c->tasks[c->task_index].indata, frame); - task.index = c->task_index; - task.indata = (void*)new; pthread_mutex_lock(&c->task_fifo_mutex); - av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL); + c->task_index = (c->task_index + 1) % c->max_tasks; pthread_cond_signal(&c->task_fifo_cond); pthread_mutex_unlock(&c->task_fifo_mutex); - - c->task_index = (c->task_index+1) % BUFFER_SIZE; } + outtask = &c->tasks[c->finished_task_index]; pthread_mutex_lock(&c->finished_task_mutex); + /* The access to task_index in the following code is ok, + * because it is only ever changed by the main thread. */ if (c->task_index == c->finished_task_index || - (frame && !c->finished_tasks[c->finished_task_index].outdata && - (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) { + (frame && !outtask->finished && + (c->task_index - c->finished_task_index + c->max_tasks) % c->max_tasks <= avctx->thread_count)) { pthread_mutex_unlock(&c->finished_task_mutex); return 0; } - - while (!c->finished_tasks[c->finished_task_index].outdata) { + while (!outtask->finished) { pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex); } - task = c->finished_tasks[c->finished_task_index]; - *pkt = *(AVPacket*)(task.outdata); + pthread_mutex_unlock(&c->finished_task_mutex); + /* We now own outtask completely: No worker thread touches it any more, + * because there is no outstanding task with this index. */ + outtask->finished = 0; + av_packet_move_ref(pkt, outtask->outdata); if(pkt->data) *got_packet_ptr = 1; - av_freep(&c->finished_tasks[c->finished_task_index].outdata); - c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE; - pthread_mutex_unlock(&c->finished_task_mutex); + c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks; - return task.return_code; + return outtask->return_code; }