]> git.sesse.net Git - ffmpeg/blob - libavformat/async.c
Merge commit 'd6604b29ef544793479d7fb4e05ef6622bb3e534'
[ffmpeg] / libavformat / async.c
1 /*
2  * Input async protocol.
3  * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  *
21  * Based on libavformat/cache.c by Michael Niedermayer
22  */
23
24  /**
25  * @TODO
26  *      support timeout
27  *      support backward short seek
28  *      support work with concatdec, hls
29  */
30
31 #include "libavutil/avassert.h"
32 #include "libavutil/avstring.h"
33 #include "libavutil/error.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/log.h"
36 #include "libavutil/opt.h"
37 #include "url.h"
38 #include <stdint.h>
39 #include <pthread.h>
40
41 #if HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44
45 #define BUFFER_CAPACITY         (4 * 1024 * 1024)
46 #define SHORT_SEEK_THRESHOLD    (256 * 1024)
47
48 typedef struct Context {
49     AVClass        *class;
50     URLContext     *inner;
51
52     int             seek_request;
53     size_t          seek_pos;
54     int             seek_whence;
55     int             seek_completed;
56     int64_t         seek_ret;
57
58     int             io_error;
59     int             io_eof_reached;
60
61     size_t          logical_pos;
62     size_t          logical_size;
63     AVFifoBuffer   *fifo;
64
65     pthread_cond_t  cond_wakeup_main;
66     pthread_cond_t  cond_wakeup_background;
67     pthread_mutex_t mutex;
68     pthread_t       async_buffer_thread;
69
70     int             abort_request;
71     AVIOInterruptCB interrupt_callback;
72 } Context;
73
74 static int async_interrupt_callback(void *arg)
75 {
76     URLContext *h   = arg;
77     Context    *c   = h->priv_data;
78     int         ret = 0;
79
80     if (c->interrupt_callback.callback) {
81         ret = c->interrupt_callback.callback(c->interrupt_callback.opaque);
82         if (!ret)
83             return ret;
84     }
85
86     return c->abort_request;
87 }
88
89 static void *async_buffer_task(void *arg)
90 {
91     URLContext   *h    = arg;
92     Context      *c    = h->priv_data;
93     AVFifoBuffer *fifo = c->fifo;
94     int           ret  = 0;
95
96     while (1) {
97         int fifo_space, to_copy;
98
99         if (async_interrupt_callback(h)) {
100             c->io_eof_reached = 1;
101             c->io_error       = AVERROR_EXIT;
102             break;
103         }
104
105         if (c->seek_request) {
106             pthread_mutex_lock(&c->mutex);
107
108             ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
109             if (ret < 0) {
110                 c->io_eof_reached = 1;
111                 c->io_error       = ret;
112             } else {
113                 c->io_eof_reached = 0;
114                 c->io_error       = 0;
115             }
116
117             c->seek_completed = 1;
118             c->seek_ret       = ret;
119             c->seek_request   = 0;
120
121             av_fifo_reset(fifo);
122
123             pthread_cond_signal(&c->cond_wakeup_main);
124             pthread_mutex_unlock(&c->mutex);
125             continue;
126         }
127
128         fifo_space = av_fifo_space(fifo);
129         if (c->io_eof_reached || fifo_space <= 0) {
130             pthread_mutex_lock(&c->mutex);
131             pthread_cond_signal(&c->cond_wakeup_main);
132             pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
133             pthread_mutex_unlock(&c->mutex);
134             continue;
135         }
136
137         to_copy = FFMIN(4096, fifo_space);
138         ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
139         if (ret <= 0) {
140             c->io_eof_reached = 1;
141             if (ret < 0) {
142                 c->io_error = ret;
143             }
144         }
145
146         pthread_mutex_lock(&c->mutex);
147         pthread_cond_signal(&c->cond_wakeup_main);
148         pthread_mutex_unlock(&c->mutex);
149     }
150
151     return NULL;
152 }
153
154 static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
155 {
156     Context         *c = h->priv_data;
157     int              ret;
158     AVIOInterruptCB  interrupt_callback = {.callback = async_interrupt_callback, .opaque = h};
159
160     av_strstart(arg, "async:", &arg);
161
162     c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
163     if (!c->fifo) {
164         ret = AVERROR(ENOMEM);
165         goto fifo_fail;
166     }
167
168     /* wrap interrupt callback */
169     c->interrupt_callback = h->interrupt_callback;
170     ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
171     if (ret != 0) {
172         av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", strerror(ret), arg);
173         goto url_fail;
174     }
175
176     c->logical_size = ffurl_size(c->inner);
177     h->is_streamed  = c->inner->is_streamed;
178
179     ret = pthread_mutex_init(&c->mutex, NULL);
180     if (ret != 0) {
181         av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
182         goto mutex_fail;
183     }
184
185     ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
186     if (ret != 0) {
187         av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
188         goto cond_wakeup_main_fail;
189     }
190
191     ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
192     if (ret != 0) {
193         av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
194         goto cond_wakeup_background_fail;
195     }
196
197     ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
198     if (ret) {
199         av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
200         goto thread_fail;
201     }
202
203     return 0;
204
205 thread_fail:
206     pthread_cond_destroy(&c->cond_wakeup_background);
207 cond_wakeup_background_fail:
208     pthread_cond_destroy(&c->cond_wakeup_main);
209 cond_wakeup_main_fail:
210     pthread_mutex_destroy(&c->mutex);
211 mutex_fail:
212     ffurl_close(c->inner);
213 url_fail:
214     av_fifo_freep(&c->fifo);
215 fifo_fail:
216     return ret;
217 }
218
219 static int async_close(URLContext *h)
220 {
221     Context *c = h->priv_data;
222     int      ret;
223
224     pthread_mutex_lock(&c->mutex);
225     c->abort_request = 1;
226     pthread_cond_signal(&c->cond_wakeup_background);
227     pthread_mutex_unlock(&c->mutex);
228
229     ret = pthread_join(c->async_buffer_thread, NULL);
230     if (ret != 0)
231         av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
232
233     pthread_cond_destroy(&c->cond_wakeup_background);
234     pthread_cond_destroy(&c->cond_wakeup_main);
235     pthread_mutex_destroy(&c->mutex);
236     ffurl_close(c->inner);
237     av_fifo_freep(&c->fifo);
238
239     return 0;
240 }
241
242 static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
243                                void (*func)(void*, void*, int))
244 {
245     Context      *c       = h->priv_data;
246     AVFifoBuffer *fifo    = c->fifo;
247     int           to_read = size;
248     int           ret     = 0;
249
250     pthread_mutex_lock(&c->mutex);
251
252     while (to_read > 0) {
253         int fifo_size, to_copy;
254         if (async_interrupt_callback(h)) {
255             ret = AVERROR_EXIT;
256             break;
257         }
258         fifo_size = av_fifo_size(fifo);
259         to_copy   = FFMIN(to_read, fifo_size);
260         if (to_copy > 0) {
261             av_fifo_generic_read(fifo, dest, to_copy, func);
262             if (!func)
263                 dest = (uint8_t *)dest + to_copy;
264             c->logical_pos += to_copy;
265             to_read        -= to_copy;
266             ret             = size - to_read;
267
268             if (to_read <= 0 || !read_complete)
269                 break;
270         } else if (c->io_eof_reached) {
271             if (ret <= 0)
272                 ret = AVERROR_EOF;
273             break;
274         }
275         pthread_cond_signal(&c->cond_wakeup_background);
276         pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
277     }
278
279     pthread_cond_signal(&c->cond_wakeup_background);
280     pthread_mutex_unlock(&c->mutex);
281
282     return ret;
283 }
284
285 static int async_read(URLContext *h, unsigned char *buf, int size)
286 {
287     return async_read_internal(h, buf, size, 0, NULL);
288 }
289
290 static void fifo_do_not_copy_func(void* dest, void* src, int size) {
291     // do not copy
292 }
293
294 static int64_t async_seek(URLContext *h, int64_t pos, int whence)
295 {
296     Context      *c    = h->priv_data;
297     AVFifoBuffer *fifo = c->fifo;
298     int64_t       ret;
299     int64_t       new_logical_pos;
300     int fifo_size;
301
302     if (whence == AVSEEK_SIZE) {
303         av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
304         return c->logical_size;
305     } else if (whence == SEEK_CUR) {
306         av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
307         new_logical_pos = pos + c->logical_pos;
308     } else if (whence == SEEK_SET){
309         av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
310         new_logical_pos = pos;
311     } else {
312         return AVERROR(EINVAL);
313     }
314     if (new_logical_pos < 0)
315         return AVERROR(EINVAL);
316
317     fifo_size = av_fifo_size(fifo);
318     if (new_logical_pos == c->logical_pos) {
319         /* current position */
320         return c->logical_pos;
321     } else if ((new_logical_pos > c->logical_pos) &&
322                (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
323         /* fast seek */
324         av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
325                 new_logical_pos, (int)c->logical_pos,
326                 (int)(new_logical_pos - c->logical_pos), fifo_size);
327         async_read_internal(h, NULL, new_logical_pos - c->logical_pos, 1, fifo_do_not_copy_func);
328         return c->logical_pos;
329     } else if (c->logical_size <= 0) {
330         /* can not seek */
331         return AVERROR(EINVAL);
332     } else if (new_logical_pos > c->logical_size) {
333         /* beyond end */
334         return AVERROR(EINVAL);
335     }
336
337     pthread_mutex_lock(&c->mutex);
338
339     c->seek_request   = 1;
340     c->seek_pos       = new_logical_pos;
341     c->seek_whence    = SEEK_SET;
342     c->seek_completed = 0;
343     c->seek_ret       = 0;
344
345     while (1) {
346         if (async_interrupt_callback(h)) {
347             ret = AVERROR_EXIT;
348             break;
349         }
350         if (c->seek_completed) {
351             if (c->seek_ret >= 0)
352                 c->logical_pos  = c->seek_ret;
353             ret = c->seek_ret;
354             break;
355         }
356         pthread_cond_signal(&c->cond_wakeup_background);
357         pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
358     }
359
360     pthread_mutex_unlock(&c->mutex);
361
362     return ret;
363 }
364
365 #define OFFSET(x) offsetof(Context, x)
366 #define D AV_OPT_FLAG_DECODING_PARAM
367
368 static const AVOption options[] = {
369     {NULL},
370 };
371
372 static const AVClass async_context_class = {
373     .class_name = "Async",
374     .item_name  = av_default_item_name,
375     .option     = options,
376     .version    = LIBAVUTIL_VERSION_INT,
377 };
378
379 URLProtocol ff_async_protocol = {
380     .name                = "async",
381     .url_open2           = async_open,
382     .url_read            = async_read,
383     .url_seek            = async_seek,
384     .url_close           = async_close,
385     .priv_data_size      = sizeof(Context),
386     .priv_data_class     = &async_context_class,
387 };