]> git.sesse.net Git - ffmpeg/blobdiff - libavutil/threadmessage.c
Merge commit 'cb49bb10ca7fcff2e382d9d989232b1a7f28e7da'
[ffmpeg] / libavutil / threadmessage.c
index b7fcbe28c0125dfe73a8f94753ab7491fa4b91c5..921d83128b5de929081ce0958e83b8e9dec0ae0e 100644 (file)
@@ -36,10 +36,12 @@ struct AVThreadMessageQueue {
 #if HAVE_THREADS
     AVFifoBuffer *fifo;
     pthread_mutex_t lock;
-    pthread_cond_t cond;
+    pthread_cond_t cond_recv;
+    pthread_cond_t cond_send;
     int err_send;
     int err_recv;
     unsigned elsize;
+    void (*free_func)(void *msg);
 #else
     int dummy;
 #endif
@@ -61,13 +63,20 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
         av_free(rmq);
         return AVERROR(ret);
     }
-    if ((ret = pthread_cond_init(&rmq->cond, NULL))) {
+    if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
+        pthread_mutex_destroy(&rmq->lock);
+        av_free(rmq);
+        return AVERROR(ret);
+    }
+    if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
+        pthread_cond_destroy(&rmq->cond_recv);
         pthread_mutex_destroy(&rmq->lock);
         av_free(rmq);
         return AVERROR(ret);
     }
     if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
-        pthread_cond_destroy(&rmq->cond);
+        pthread_cond_destroy(&rmq->cond_send);
+        pthread_cond_destroy(&rmq->cond_recv);
         pthread_mutex_destroy(&rmq->lock);
         av_free(rmq);
         return AVERROR(ret);
@@ -81,12 +90,20 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
 #endif /* HAVE_THREADS */
 }
 
+void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq,
+                                           void (*free_func)(void *msg))
+{
+    mq->free_func = free_func;
+}
+
 void av_thread_message_queue_free(AVThreadMessageQueue **mq)
 {
 #if HAVE_THREADS
     if (*mq) {
+        av_thread_message_flush(*mq);
         av_fifo_freep(&(*mq)->fifo);
-        pthread_cond_destroy(&(*mq)->cond);
+        pthread_cond_destroy(&(*mq)->cond_send);
+        pthread_cond_destroy(&(*mq)->cond_recv);
         pthread_mutex_destroy(&(*mq)->lock);
         av_freep(mq);
     }
@@ -102,12 +119,13 @@ static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
     while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
         if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
             return AVERROR(EAGAIN);
-        pthread_cond_wait(&mq->cond, &mq->lock);
+        pthread_cond_wait(&mq->cond_send, &mq->lock);
     }
     if (mq->err_send)
         return mq->err_send;
     av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
-    pthread_cond_signal(&mq->cond);
+    /* one message is sent, signal one receiver */
+    pthread_cond_signal(&mq->cond_recv);
     return 0;
 }
 
@@ -118,12 +136,13 @@ static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
     while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
         if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
             return AVERROR(EAGAIN);
-        pthread_cond_wait(&mq->cond, &mq->lock);
+        pthread_cond_wait(&mq->cond_recv, &mq->lock);
     }
     if (av_fifo_size(mq->fifo) < mq->elsize)
         return mq->err_recv;
     av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
-    pthread_cond_signal(&mq->cond);
+    /* one message space appeared, signal one sender */
+    pthread_cond_signal(&mq->cond_send);
     return 0;
 }
 
@@ -167,7 +186,7 @@ void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
 #if HAVE_THREADS
     pthread_mutex_lock(&mq->lock);
     mq->err_send = err;
-    pthread_cond_broadcast(&mq->cond);
+    pthread_cond_broadcast(&mq->cond_send);
     pthread_mutex_unlock(&mq->lock);
 #endif /* HAVE_THREADS */
 }
@@ -178,7 +197,34 @@ void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
 #if HAVE_THREADS
     pthread_mutex_lock(&mq->lock);
     mq->err_recv = err;
-    pthread_cond_broadcast(&mq->cond);
+    pthread_cond_broadcast(&mq->cond_recv);
+    pthread_mutex_unlock(&mq->lock);
+#endif /* HAVE_THREADS */
+}
+
+#if HAVE_THREADS
+static void free_func_wrap(void *arg, void *msg, int size)
+{
+    AVThreadMessageQueue *mq = arg;
+    mq->free_func(msg);
+}
+#endif
+
+void av_thread_message_flush(AVThreadMessageQueue *mq)
+{
+#if HAVE_THREADS
+    int used, off;
+    void *free_func = mq->free_func;
+
+    pthread_mutex_lock(&mq->lock);
+    used = av_fifo_size(mq->fifo);
+    if (free_func)
+        for (off = 0; off < used; off += mq->elsize)
+            av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
+    av_fifo_drain(mq->fifo, used);
+    /* only the senders need to be notified since the queue is empty and there
+     * is nothing to read */
+    pthread_cond_broadcast(&mq->cond_send);
     pthread_mutex_unlock(&mq->lock);
 #endif /* HAVE_THREADS */
 }