]> git.sesse.net Git - ffmpeg/blob - libavcodec/pthread_slice.c
avcodec: Mark argument in av_{parser|hwaccel|bitstream_filter}_next as const
[ffmpeg] / libavcodec / pthread_slice.c
1 /*
2  * This file is part of Libav.
3  *
4  * Libav is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * Libav is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with Libav; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18
19 /**
20  * @file
21  * Slice multithreading support functions
22  * @see doc/multithreading.txt
23  */
24
25 #include "config.h"
26
27 #if HAVE_PTHREADS
28 #include <pthread.h>
29 #elif HAVE_W32THREADS
30 #include "compat/w32pthreads.h"
31 #endif
32
33 #include "avcodec.h"
34 #include "internal.h"
35 #include "pthread_internal.h"
36 #include "thread.h"
37
38 #include "libavutil/common.h"
39 #include "libavutil/cpu.h"
40 #include "libavutil/mem.h"
41
42 typedef int (action_func)(AVCodecContext *c, void *arg);
43 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
44
45 typedef struct SliceThreadContext {
46     pthread_t *workers;
47     action_func *func;
48     action_func2 *func2;
49     void *args;
50     int *rets;
51     int rets_count;
52     int job_count;
53     int job_size;
54
55     pthread_cond_t last_job_cond;
56     pthread_cond_t current_job_cond;
57     pthread_mutex_t current_job_lock;
58     unsigned current_execute;
59     int current_job;
60     int done;
61 } SliceThreadContext;
62
63 static void* attribute_align_arg worker(void *v)
64 {
65     AVCodecContext *avctx = v;
66     SliceThreadContext *c = avctx->internal->thread_ctx;
67     unsigned last_execute = 0;
68     int our_job = c->job_count;
69     int thread_count = avctx->thread_count;
70     int self_id;
71
72     pthread_mutex_lock(&c->current_job_lock);
73     self_id = c->current_job++;
74     for (;;){
75         while (our_job >= c->job_count) {
76             if (c->current_job == thread_count + c->job_count)
77                 pthread_cond_signal(&c->last_job_cond);
78
79             while (last_execute == c->current_execute && !c->done)
80                 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
81             last_execute = c->current_execute;
82             our_job = self_id;
83
84             if (c->done) {
85                 pthread_mutex_unlock(&c->current_job_lock);
86                 return NULL;
87             }
88         }
89         pthread_mutex_unlock(&c->current_job_lock);
90
91         c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
92                                                    c->func2(avctx, c->args, our_job, self_id);
93
94         pthread_mutex_lock(&c->current_job_lock);
95         our_job = c->current_job++;
96     }
97 }
98
99 void ff_slice_thread_free(AVCodecContext *avctx)
100 {
101     SliceThreadContext *c = avctx->internal->thread_ctx;
102     int i;
103
104     pthread_mutex_lock(&c->current_job_lock);
105     c->done = 1;
106     pthread_cond_broadcast(&c->current_job_cond);
107     pthread_mutex_unlock(&c->current_job_lock);
108
109     for (i=0; i<avctx->thread_count; i++)
110          pthread_join(c->workers[i], NULL);
111
112     pthread_mutex_destroy(&c->current_job_lock);
113     pthread_cond_destroy(&c->current_job_cond);
114     pthread_cond_destroy(&c->last_job_cond);
115     av_free(c->workers);
116     av_freep(&avctx->internal->thread_ctx);
117 }
118
119 static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
120 {
121     while (c->current_job != thread_count + c->job_count)
122         pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
123     pthread_mutex_unlock(&c->current_job_lock);
124 }
125
126 static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
127 {
128     SliceThreadContext *c = avctx->internal->thread_ctx;
129     int dummy_ret;
130
131     if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
132         return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
133
134     if (job_count <= 0)
135         return 0;
136
137     pthread_mutex_lock(&c->current_job_lock);
138
139     c->current_job = avctx->thread_count;
140     c->job_count = job_count;
141     c->job_size = job_size;
142     c->args = arg;
143     c->func = func;
144     if (ret) {
145         c->rets = ret;
146         c->rets_count = job_count;
147     } else {
148         c->rets = &dummy_ret;
149         c->rets_count = 1;
150     }
151     c->current_execute++;
152     pthread_cond_broadcast(&c->current_job_cond);
153
154     thread_park_workers(c, avctx->thread_count);
155
156     return 0;
157 }
158
159 static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
160 {
161     SliceThreadContext *c = avctx->internal->thread_ctx;
162     c->func2 = func2;
163     return thread_execute(avctx, NULL, arg, ret, job_count, 0);
164 }
165
166 int ff_slice_thread_init(AVCodecContext *avctx)
167 {
168     int i;
169     SliceThreadContext *c;
170     int thread_count = avctx->thread_count;
171
172 #if HAVE_W32THREADS
173     w32thread_init();
174 #endif
175
176     if (!thread_count) {
177         int nb_cpus = av_cpu_count();
178         av_log(avctx, AV_LOG_DEBUG, "detected %d logical cores\n", nb_cpus);
179         // use number of cores + 1 as thread count if there is more than one
180         if (nb_cpus > 1)
181             thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
182         else
183             thread_count = avctx->thread_count = 1;
184     }
185
186     if (thread_count <= 1) {
187         avctx->active_thread_type = 0;
188         return 0;
189     }
190
191     c = av_mallocz(sizeof(SliceThreadContext));
192     if (!c)
193         return -1;
194
195     c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
196     if (!c->workers) {
197         av_free(c);
198         return -1;
199     }
200
201     avctx->internal->thread_ctx = c;
202     c->current_job = 0;
203     c->job_count = 0;
204     c->job_size = 0;
205     c->done = 0;
206     pthread_cond_init(&c->current_job_cond, NULL);
207     pthread_cond_init(&c->last_job_cond, NULL);
208     pthread_mutex_init(&c->current_job_lock, NULL);
209     pthread_mutex_lock(&c->current_job_lock);
210     for (i=0; i<thread_count; i++) {
211         if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
212            avctx->thread_count = i;
213            pthread_mutex_unlock(&c->current_job_lock);
214            ff_thread_free(avctx);
215            return -1;
216         }
217     }
218
219     thread_park_workers(c, thread_count);
220
221     avctx->execute = thread_execute;
222     avctx->execute2 = thread_execute2;
223     return 0;
224 }