]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/async.c
lavf/http: Implement server side network code.
[ffmpeg] / libavformat / async.c
index 0748309639f4c6c0fe3e62157de7c46ee8c235b1..60ea14c9e0e85e21e59ff83f2ce50972c4098ccd 100644 (file)
@@ -71,17 +71,16 @@ typedef struct Context {
     AVIOInterruptCB interrupt_callback;
 } Context;
 
-static int async_interrupt_callback(void *arg)
+static int async_check_interrupt(void *arg)
 {
     URLContext *h   = arg;
     Context    *c   = h->priv_data;
-    int         ret = 0;
 
-    if (c->interrupt_callback.callback) {
-        ret = c->interrupt_callback.callback(c->interrupt_callback.opaque);
-        if (!ret)
-            return ret;
-    }
+    if (c->abort_request)
+        return 1;
+
+    if (ff_check_interrupt(&c->interrupt_callback))
+        c->abort_request = 1;
 
     return c->abort_request;
 }
@@ -96,15 +95,16 @@ static void *async_buffer_task(void *arg)
     while (1) {
         int fifo_space, to_copy;
 
-        if (async_interrupt_callback(h)) {
+        pthread_mutex_lock(&c->mutex);
+        if (async_check_interrupt(h)) {
             c->io_eof_reached = 1;
             c->io_error       = AVERROR_EXIT;
+            pthread_cond_signal(&c->cond_wakeup_main);
+            pthread_mutex_unlock(&c->mutex);
             break;
         }
 
         if (c->seek_request) {
-            pthread_mutex_lock(&c->mutex);
-
             ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
             if (ret < 0) {
                 c->io_eof_reached = 1;
@@ -127,15 +127,17 @@ static void *async_buffer_task(void *arg)
 
         fifo_space = av_fifo_space(fifo);
         if (c->io_eof_reached || fifo_space <= 0) {
-            pthread_mutex_lock(&c->mutex);
             pthread_cond_signal(&c->cond_wakeup_main);
             pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
             pthread_mutex_unlock(&c->mutex);
             continue;
         }
+        pthread_mutex_unlock(&c->mutex);
 
         to_copy = FFMIN(4096, fifo_space);
         ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
+
+        pthread_mutex_lock(&c->mutex);
         if (ret <= 0) {
             c->io_eof_reached = 1;
             if (ret < 0) {
@@ -143,7 +145,6 @@ static void *async_buffer_task(void *arg)
             }
         }
 
-        pthread_mutex_lock(&c->mutex);
         pthread_cond_signal(&c->cond_wakeup_main);
         pthread_mutex_unlock(&c->mutex);
     }
@@ -155,7 +156,7 @@ static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **
 {
     Context         *c = h->priv_data;
     int              ret;
-    AVIOInterruptCB  interrupt_callback = {.callback = async_interrupt_callback, .opaque = h};
+    AVIOInterruptCB  interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
 
     av_strstart(arg, "async:", &arg);
 
@@ -251,7 +252,7 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com
 
     while (to_read > 0) {
         int fifo_size, to_copy;
-        if (async_interrupt_callback(h)) {
+        if (async_check_interrupt(h)) {
             ret = AVERROR_EXIT;
             break;
         }
@@ -343,7 +344,7 @@ static int64_t async_seek(URLContext *h, int64_t pos, int whence)
     c->seek_ret       = 0;
 
     while (1) {
-        if (async_interrupt_callback(h)) {
+        if (async_check_interrupt(h)) {
             ret = AVERROR_EXIT;
             break;
         }
@@ -385,3 +386,174 @@ URLProtocol ff_async_protocol = {
     .priv_data_size      = sizeof(Context),
     .priv_data_class     = &async_context_class,
 };
+
+#ifdef TEST
+
+#define TEST_SEEK_POS    (1536)
+#define TEST_STREAM_SIZE (2048)
+
+typedef struct TestContext {
+    AVClass        *class;
+    size_t          logical_pos;
+    size_t          logical_size;
+} TestContext;
+
+static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
+{
+    TestContext *c = h->priv_data;
+    c->logical_pos  = 0;
+    c->logical_size = TEST_STREAM_SIZE;
+    return 0;
+}
+
+static int async_test_close(URLContext *h)
+{
+    return 0;
+}
+
+static int async_test_read(URLContext *h, unsigned char *buf, int size)
+{
+    TestContext *c = h->priv_data;
+    int          i;
+    int          read_len = 0;
+
+    if (c->logical_pos >= c->logical_size)
+        return AVERROR_EOF;
+
+    for (i = 0; i < size; ++i) {
+        buf[i] = c->logical_pos & 0xFF;
+
+        c->logical_pos++;
+        read_len++;
+
+        if (c->logical_pos >= c->logical_size)
+            break;
+    }
+
+    return read_len;
+}
+
+static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
+{
+    TestContext *c = h->priv_data;
+    int64_t      new_logical_pos;
+
+    if (whence == AVSEEK_SIZE) {
+        return c->logical_size;
+    } else if (whence == SEEK_CUR) {
+        new_logical_pos = pos + c->logical_pos;
+    } else if (whence == SEEK_SET){
+        new_logical_pos = pos;
+    } else {
+        return AVERROR(EINVAL);
+    }
+    if (new_logical_pos < 0)
+        return AVERROR(EINVAL);
+
+    c->logical_pos = new_logical_pos;
+    return new_logical_pos;
+}
+
+static const AVClass async_test_context_class = {
+    .class_name = "Async-Test",
+    .item_name  = av_default_item_name,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+URLProtocol ff_async_test_protocol = {
+    .name                = "async-test",
+    .url_open2           = async_test_open,
+    .url_read            = async_test_read,
+    .url_seek            = async_test_seek,
+    .url_close           = async_test_close,
+    .priv_data_size      = sizeof(TestContext),
+    .priv_data_class     = &async_test_context_class,
+};
+
+int main(void)
+{
+    URLContext   *h = NULL;
+    int           i;
+    int           ret;
+    int64_t       size;
+    int64_t       pos;
+    int64_t       read_len;
+    unsigned char buf[4096];
+
+    ffurl_register_protocol(&ff_async_protocol);
+    ffurl_register_protocol(&ff_async_test_protocol);
+
+    ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
+    printf("open: %d\n", ret);
+
+    size = ffurl_size(h);
+    printf("size: %"PRId64"\n", size);
+
+    pos = ffurl_seek(h, 0, SEEK_CUR);
+    read_len = 0;
+    while (1) {
+        ret = ffurl_read(h, buf, sizeof(buf));
+        if (ret == AVERROR_EOF) {
+            printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
+            break;
+        }
+        else if (ret == 0)
+            break;
+        else if (ret < 0) {
+            printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
+            goto fail;
+        } else {
+            for (i = 0; i < ret; ++i) {
+                if (buf[i] != (pos & 0xFF)) {
+                    printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
+                           (int)buf[i], (int)(pos & 0xFF), pos);
+                    break;
+                }
+                pos++;
+            }
+        }
+
+        read_len += ret;
+    }
+    printf("read: %"PRId64"\n", read_len);
+
+    ret = ffurl_read(h, buf, 1);
+    printf("read: %d\n", ret);
+
+    pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
+    printf("seek: %"PRId64"\n", pos);
+
+    read_len = 0;
+    while (1) {
+        ret = ffurl_read(h, buf, sizeof(buf));
+        if (ret == AVERROR_EOF)
+            break;
+        else if (ret == 0)
+            break;
+        else if (ret < 0) {
+            printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
+            goto fail;
+        } else {
+            for (i = 0; i < ret; ++i) {
+                if (buf[i] != (pos & 0xFF)) {
+                    printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
+                           (int)buf[i], (int)(pos & 0xFF), pos);
+                    break;
+                }
+                pos++;
+            }
+        }
+
+        read_len += ret;
+    }
+    printf("read: %"PRId64"\n", read_len);
+
+    ret = ffurl_read(h, buf, 1);
+    printf("read: %d\n", ret);
+
+fail:
+    ffurl_close(h);
+    return 0;
+}
+
+#endif