AVIOInterruptCB interrupt_callback;
} Context;
-static int async_interrupt_callback(void *arg)
+static int async_check_interrupt(void *arg)
{
URLContext *h = arg;
Context *c = h->priv_data;
- int ret = 0;
- if (c->interrupt_callback.callback) {
- ret = c->interrupt_callback.callback(c->interrupt_callback.opaque);
- if (!ret)
- return ret;
- }
+ if (c->abort_request)
+ return 1;
+
+ if (ff_check_interrupt(&c->interrupt_callback))
+ c->abort_request = 1;
return c->abort_request;
}
while (1) {
int fifo_space, to_copy;
- if (async_interrupt_callback(h)) {
+ pthread_mutex_lock(&c->mutex);
+ if (async_check_interrupt(h)) {
c->io_eof_reached = 1;
c->io_error = AVERROR_EXIT;
+ pthread_cond_signal(&c->cond_wakeup_main);
+ pthread_mutex_unlock(&c->mutex);
break;
}
if (c->seek_request) {
- pthread_mutex_lock(&c->mutex);
-
ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
if (ret < 0) {
c->io_eof_reached = 1;
fifo_space = av_fifo_space(fifo);
if (c->io_eof_reached || fifo_space <= 0) {
- pthread_mutex_lock(&c->mutex);
pthread_cond_signal(&c->cond_wakeup_main);
pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
pthread_mutex_unlock(&c->mutex);
continue;
}
+ pthread_mutex_unlock(&c->mutex);
to_copy = FFMIN(4096, fifo_space);
ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
+
+ pthread_mutex_lock(&c->mutex);
if (ret <= 0) {
c->io_eof_reached = 1;
if (ret < 0) {
}
}
- pthread_mutex_lock(&c->mutex);
pthread_cond_signal(&c->cond_wakeup_main);
pthread_mutex_unlock(&c->mutex);
}
{
Context *c = h->priv_data;
int ret;
- AVIOInterruptCB interrupt_callback = {.callback = async_interrupt_callback, .opaque = h};
+ AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
av_strstart(arg, "async:", &arg);
while (to_read > 0) {
int fifo_size, to_copy;
- if (async_interrupt_callback(h)) {
+ if (async_check_interrupt(h)) {
ret = AVERROR_EXIT;
break;
}
c->seek_ret = 0;
while (1) {
- if (async_interrupt_callback(h)) {
+ if (async_check_interrupt(h)) {
ret = AVERROR_EXIT;
break;
}
.priv_data_size = sizeof(Context),
.priv_data_class = &async_context_class,
};
+
+#ifdef TEST
+
+#define TEST_SEEK_POS (1536)
+#define TEST_STREAM_SIZE (2048)
+
+typedef struct TestContext {
+ AVClass *class;
+ size_t logical_pos;
+ size_t logical_size;
+} TestContext;
+
+static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
+{
+ TestContext *c = h->priv_data;
+ c->logical_pos = 0;
+ c->logical_size = TEST_STREAM_SIZE;
+ return 0;
+}
+
+static int async_test_close(URLContext *h)
+{
+ return 0;
+}
+
+static int async_test_read(URLContext *h, unsigned char *buf, int size)
+{
+ TestContext *c = h->priv_data;
+ int i;
+ int read_len = 0;
+
+ if (c->logical_pos >= c->logical_size)
+ return AVERROR_EOF;
+
+ for (i = 0; i < size; ++i) {
+ buf[i] = c->logical_pos & 0xFF;
+
+ c->logical_pos++;
+ read_len++;
+
+ if (c->logical_pos >= c->logical_size)
+ break;
+ }
+
+ return read_len;
+}
+
+static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
+{
+ TestContext *c = h->priv_data;
+ int64_t new_logical_pos;
+
+ if (whence == AVSEEK_SIZE) {
+ return c->logical_size;
+ } else if (whence == SEEK_CUR) {
+ new_logical_pos = pos + c->logical_pos;
+ } else if (whence == SEEK_SET){
+ new_logical_pos = pos;
+ } else {
+ return AVERROR(EINVAL);
+ }
+ if (new_logical_pos < 0)
+ return AVERROR(EINVAL);
+
+ c->logical_pos = new_logical_pos;
+ return new_logical_pos;
+}
+
+static const AVClass async_test_context_class = {
+ .class_name = "Async-Test",
+ .item_name = av_default_item_name,
+ .version = LIBAVUTIL_VERSION_INT,
+};
+
+URLProtocol ff_async_test_protocol = {
+ .name = "async-test",
+ .url_open2 = async_test_open,
+ .url_read = async_test_read,
+ .url_seek = async_test_seek,
+ .url_close = async_test_close,
+ .priv_data_size = sizeof(TestContext),
+ .priv_data_class = &async_test_context_class,
+};
+
+int main(void)
+{
+ URLContext *h = NULL;
+ int i;
+ int ret;
+ int64_t size;
+ int64_t pos;
+ int64_t read_len;
+ unsigned char buf[4096];
+
+ ffurl_register_protocol(&ff_async_protocol);
+ ffurl_register_protocol(&ff_async_test_protocol);
+
+ ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
+ printf("open: %d\n", ret);
+
+ size = ffurl_size(h);
+ printf("size: %"PRId64"\n", size);
+
+ pos = ffurl_seek(h, 0, SEEK_CUR);
+ read_len = 0;
+ while (1) {
+ ret = ffurl_read(h, buf, sizeof(buf));
+ if (ret == AVERROR_EOF) {
+ printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
+ break;
+ }
+ else if (ret == 0)
+ break;
+ else if (ret < 0) {
+ printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
+ goto fail;
+ } else {
+ for (i = 0; i < ret; ++i) {
+ if (buf[i] != (pos & 0xFF)) {
+ printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
+ (int)buf[i], (int)(pos & 0xFF), pos);
+ break;
+ }
+ pos++;
+ }
+ }
+
+ read_len += ret;
+ }
+ printf("read: %"PRId64"\n", read_len);
+
+ ret = ffurl_read(h, buf, 1);
+ printf("read: %d\n", ret);
+
+ pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
+ printf("seek: %"PRId64"\n", pos);
+
+ read_len = 0;
+ while (1) {
+ ret = ffurl_read(h, buf, sizeof(buf));
+ if (ret == AVERROR_EOF)
+ break;
+ else if (ret == 0)
+ break;
+ else if (ret < 0) {
+ printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
+ goto fail;
+ } else {
+ for (i = 0; i < ret; ++i) {
+ if (buf[i] != (pos & 0xFF)) {
+ printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
+ (int)buf[i], (int)(pos & 0xFF), pos);
+ break;
+ }
+ pos++;
+ }
+ }
+
+ read_len += ret;
+ }
+ printf("read: %"PRId64"\n", read_len);
+
+ ret = ffurl_read(h, buf, 1);
+ printf("read: %d\n", ret);
+
+fail:
+ ffurl_close(h);
+ return 0;
+}
+
+#endif