2 * This file is part of FFmpeg.
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 #include <stdatomic.h>
20 #include "slicethread.h"
25 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
27 typedef struct WorkerContext {
29 pthread_mutex_t mutex;
35 struct AVSliceThread {
36 WorkerContext *workers;
38 int nb_active_threads;
41 atomic_uint first_job;
42 atomic_uint current_job;
43 pthread_mutex_t done_mutex;
44 pthread_cond_t done_cond;
49 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
50 void (*main_func)(void *priv);
53 static int run_jobs(AVSliceThread *ctx)
55 unsigned nb_jobs = ctx->nb_jobs;
56 unsigned nb_active_threads = ctx->nb_active_threads;
57 unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
58 unsigned current_job = first_job;
61 ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
62 } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
64 return current_job == nb_jobs + nb_active_threads - 1;
67 static void *attribute_align_arg thread_worker(void *v)
70 AVSliceThread *ctx = w->ctx;
72 pthread_mutex_lock(&w->mutex);
73 pthread_cond_signal(&w->cond);
78 pthread_cond_wait(&w->cond, &w->mutex);
81 pthread_mutex_unlock(&w->mutex);
86 pthread_mutex_lock(&ctx->done_mutex);
88 pthread_cond_signal(&ctx->done_cond);
89 pthread_mutex_unlock(&ctx->done_mutex);
94 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
95 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
96 void (*main_func)(void *priv),
102 av_assert0(nb_threads >= 0);
104 int nb_cpus = av_cpu_count();
106 nb_threads = nb_cpus + 1;
111 nb_workers = nb_threads;
115 *pctx = ctx = av_mallocz(sizeof(*ctx));
117 return AVERROR(ENOMEM);
119 if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
121 return AVERROR(ENOMEM);
125 ctx->worker_func = worker_func;
126 ctx->main_func = main_func;
127 ctx->nb_threads = nb_threads;
128 ctx->nb_active_threads = 0;
132 atomic_init(&ctx->first_job, 0);
133 atomic_init(&ctx->current_job, 0);
134 pthread_mutex_init(&ctx->done_mutex, NULL);
135 pthread_cond_init(&ctx->done_cond, NULL);
138 for (i = 0; i < nb_workers; i++) {
139 WorkerContext *w = &ctx->workers[i];
142 pthread_mutex_init(&w->mutex, NULL);
143 pthread_cond_init(&w->cond, NULL);
144 pthread_mutex_lock(&w->mutex);
147 if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
148 ctx->nb_threads = main_func ? i : i + 1;
149 pthread_mutex_unlock(&w->mutex);
150 pthread_cond_destroy(&w->cond);
151 pthread_mutex_destroy(&w->mutex);
152 avpriv_slicethread_free(pctx);
157 pthread_cond_wait(&w->cond, &w->mutex);
158 pthread_mutex_unlock(&w->mutex);
164 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
166 int nb_workers, i, is_last = 0;
168 av_assert0(nb_jobs > 0);
169 ctx->nb_jobs = nb_jobs;
170 ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
171 atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
172 atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
173 nb_workers = ctx->nb_active_threads;
174 if (!ctx->main_func || !execute_main)
177 for (i = 0; i < nb_workers; i++) {
178 WorkerContext *w = &ctx->workers[i];
179 pthread_mutex_lock(&w->mutex);
181 pthread_cond_signal(&w->cond);
182 pthread_mutex_unlock(&w->mutex);
185 if (ctx->main_func && execute_main)
186 ctx->main_func(ctx->priv);
188 is_last = run_jobs(ctx);
191 pthread_mutex_lock(&ctx->done_mutex);
193 pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
195 pthread_mutex_unlock(&ctx->done_mutex);
199 void avpriv_slicethread_free(AVSliceThread **pctx)
208 nb_workers = ctx->nb_threads;
213 for (i = 0; i < nb_workers; i++) {
214 WorkerContext *w = &ctx->workers[i];
215 pthread_mutex_lock(&w->mutex);
217 pthread_cond_signal(&w->cond);
218 pthread_mutex_unlock(&w->mutex);
221 for (i = 0; i < nb_workers; i++) {
222 WorkerContext *w = &ctx->workers[i];
223 pthread_join(w->thread, NULL);
224 pthread_cond_destroy(&w->cond);
225 pthread_mutex_destroy(&w->mutex);
228 pthread_cond_destroy(&ctx->done_cond);
229 pthread_mutex_destroy(&ctx->done_mutex);
230 av_freep(&ctx->workers);
234 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
236 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
237 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
238 void (*main_func)(void *priv),
242 return AVERROR(EINVAL);
245 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
250 void avpriv_slicethread_free(AVSliceThread **pctx)
252 av_assert0(!pctx || !*pctx);
255 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */