#include "frame_thread_encoder.h"
-#include "libavutil/fifo.h"
#include "libavutil/avassert.h"
#include "libavutil/imgutils.h"
#include "libavutil/opt.h"
#include "thread.h"
#define MAX_THREADS 64
-#define BUFFER_SIZE (2*MAX_THREADS)
+/* There can be as many as MAX_THREADS + 1 outstanding tasks.
+ * An additional + 1 is needed so that one can distinguish
+ * the case of zero and MAX_THREADS + 1 outstanding tasks modulo
+ * the number of buffers. */
+#define BUFFER_SIZE (MAX_THREADS + 2)
typedef struct{
- void *indata;
- void *outdata;
- int64_t return_code;
- unsigned index;
+ AVFrame *indata;
+ AVPacket *outdata;
+ int return_code;
+ int finished;
} Task;
typedef struct{
AVCodecContext *parent_avctx;
pthread_mutex_t buffer_mutex;
- AVFifoBuffer *task_fifo;
- pthread_mutex_t task_fifo_mutex;
+ pthread_mutex_t task_fifo_mutex; /* Used to guard (next_)task_index */
pthread_cond_t task_fifo_cond;
- Task finished_tasks[BUFFER_SIZE];
- pthread_mutex_t finished_task_mutex;
+ unsigned max_tasks;
+ Task tasks[BUFFER_SIZE];
+ pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */
pthread_cond_t finished_task_cond;
+ unsigned next_task_index;
unsigned task_index;
unsigned finished_task_index;
static void * attribute_align_arg worker(void *v){
AVCodecContext *avctx = v;
ThreadContext *c = avctx->internal->frame_thread_encoder;
- AVPacket *pkt = NULL;
while (!atomic_load(&c->exit)) {
- int got_packet, ret;
+ int got_packet = 0, ret;
+ AVPacket *pkt;
AVFrame *frame;
- Task task;
-
- if(!pkt) pkt = av_packet_alloc();
- if(!pkt) continue;
- av_init_packet(pkt);
+ Task *task;
+ unsigned task_index;
pthread_mutex_lock(&c->task_fifo_mutex);
- while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
+ while (c->next_task_index == c->task_index || atomic_load(&c->exit)) {
if (atomic_load(&c->exit)) {
pthread_mutex_unlock(&c->task_fifo_mutex);
goto end;
}
pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
}
- av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+ task_index = c->next_task_index;
+ c->next_task_index = (c->next_task_index + 1) % c->max_tasks;
pthread_mutex_unlock(&c->task_fifo_mutex);
- frame = task.indata;
-
- ret = avcodec_encode_video2(avctx, pkt, frame, &got_packet);
- pthread_mutex_lock(&c->buffer_mutex);
- av_frame_unref(frame);
- pthread_mutex_unlock(&c->buffer_mutex);
- av_frame_free(&frame);
+ /* The main thread ensures that any two outstanding tasks have
+ * different indices, ergo each worker thread owns its element
+ * of c->tasks with the exception of finished, which is shared
+ * with the main thread and guarded by finished_task_mutex. */
+ task = &c->tasks[task_index];
+ frame = task->indata;
+ pkt = task->outdata;
+
+ ret = avctx->codec->encode2(avctx, pkt, frame, &got_packet);
if(got_packet) {
int ret2 = av_packet_make_refcounted(pkt);
if (ret >= 0 && ret2 < 0)
ret = ret2;
+ pkt->pts = pkt->dts = frame->pts;
} else {
pkt->data = NULL;
pkt->size = 0;
}
+ pthread_mutex_lock(&c->buffer_mutex);
+ av_frame_unref(frame);
+ pthread_mutex_unlock(&c->buffer_mutex);
pthread_mutex_lock(&c->finished_task_mutex);
- c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
- c->finished_tasks[task.index].return_code = ret;
+ task->return_code = ret;
+ task->finished = 1;
pthread_cond_signal(&c->finished_task_cond);
pthread_mutex_unlock(&c->finished_task_mutex);
}
end:
- av_free(pkt);
pthread_mutex_lock(&c->buffer_mutex);
avcodec_close(avctx);
pthread_mutex_unlock(&c->buffer_mutex);
if( !(avctx->thread_type & FF_THREAD_FRAME)
- || !(avctx->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY))
+ || !(avctx->codec->capabilities & AV_CODEC_CAP_FRAME_THREADS))
return 0;
if( !avctx->thread_count
c->parent_avctx = avctx;
- c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
- if(!c->task_fifo)
- goto fail;
-
pthread_mutex_init(&c->task_fifo_mutex, NULL);
pthread_mutex_init(&c->finished_task_mutex, NULL);
pthread_mutex_init(&c->buffer_mutex, NULL);
pthread_cond_init(&c->finished_task_cond, NULL);
atomic_init(&c->exit, 0);
+ c->max_tasks = avctx->thread_count + 2;
+ for (unsigned i = 0; i < c->max_tasks; i++) {
+ if (!(c->tasks[i].indata = av_frame_alloc()) ||
+ !(c->tasks[i].outdata = av_packet_alloc()))
+ goto fail;
+ }
+
for(i=0; i<avctx->thread_count ; i++){
AVDictionary *tmp = NULL;
int ret;
int ret = av_opt_copy(thread_avctx->priv_data, avctx->priv_data);
if (ret < 0)
goto fail;
- } else
+ } else if (avctx->codec->priv_data_size) {
memcpy(thread_avctx->priv_data, avctx->priv_data, avctx->codec->priv_data_size);
+ }
thread_avctx->thread_count = 1;
thread_avctx->active_thread_type &= ~FF_THREAD_FRAME;
pthread_join(c->worker[i], NULL);
}
- while (av_fifo_size(c->task_fifo) > 0) {
- Task task;
- AVFrame *frame;
- av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
- frame = task.indata;
- av_frame_free(&frame);
- task.indata = NULL;
- }
-
- for (i=0; i<BUFFER_SIZE; i++) {
- if (c->finished_tasks[i].outdata != NULL) {
- AVPacket *pkt = c->finished_tasks[i].outdata;
- av_packet_free(&pkt);
- c->finished_tasks[i].outdata = NULL;
- }
+ for (unsigned i = 0; i < c->max_tasks; i++) {
+ av_frame_free(&c->tasks[i].indata);
+ av_packet_free(&c->tasks[i].outdata);
}
pthread_mutex_destroy(&c->task_fifo_mutex);
pthread_mutex_destroy(&c->buffer_mutex);
pthread_cond_destroy(&c->task_fifo_cond);
pthread_cond_destroy(&c->finished_task_cond);
- av_fifo_freep(&c->task_fifo);
av_freep(&avctx->internal->frame_thread_encoder);
}
-int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
+int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt,
+ AVFrame *frame, int *got_packet_ptr)
+{
ThreadContext *c = avctx->internal->frame_thread_encoder;
- Task task;
- int ret;
+ Task *outtask;
av_assert1(!*got_packet_ptr);
if(frame){
- AVFrame *new = av_frame_alloc();
- if(!new)
- return AVERROR(ENOMEM);
- ret = av_frame_ref(new, frame);
- if(ret < 0) {
- av_frame_free(&new);
- return ret;
- }
+ av_frame_move_ref(c->tasks[c->task_index].indata, frame);
- task.index = c->task_index;
- task.indata = (void*)new;
pthread_mutex_lock(&c->task_fifo_mutex);
- av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
+ c->task_index = (c->task_index + 1) % c->max_tasks;
pthread_cond_signal(&c->task_fifo_cond);
pthread_mutex_unlock(&c->task_fifo_mutex);
-
- c->task_index = (c->task_index+1) % BUFFER_SIZE;
}
+ outtask = &c->tasks[c->finished_task_index];
pthread_mutex_lock(&c->finished_task_mutex);
+ /* The access to task_index in the following code is ok,
+ * because it is only ever changed by the main thread. */
if (c->task_index == c->finished_task_index ||
- (frame && !c->finished_tasks[c->finished_task_index].outdata &&
- (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) {
+ (frame && !outtask->finished &&
+ (c->task_index - c->finished_task_index + c->max_tasks) % c->max_tasks <= avctx->thread_count)) {
pthread_mutex_unlock(&c->finished_task_mutex);
return 0;
}
-
- while (!c->finished_tasks[c->finished_task_index].outdata) {
+ while (!outtask->finished) {
pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
}
- task = c->finished_tasks[c->finished_task_index];
- *pkt = *(AVPacket*)(task.outdata);
+ pthread_mutex_unlock(&c->finished_task_mutex);
+ /* We now own outtask completely: No worker thread touches it any more,
+ * because there is no outstanding task with this index. */
+ outtask->finished = 0;
+ av_packet_move_ref(pkt, outtask->outdata);
if(pkt->data)
*got_packet_ptr = 1;
- av_freep(&c->finished_tasks[c->finished_task_index].outdata);
- c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
- pthread_mutex_unlock(&c->finished_task_mutex);
+ c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks;
- return task.return_code;
+ return outtask->return_code;
}