]> git.sesse.net Git - ffmpeg/blobdiff - libavutil/threadmessage.c
Merge commit 'cb49bb10ca7fcff2e382d9d989232b1a7f28e7da'
[ffmpeg] / libavutil / threadmessage.c
index b7d7dadb5a6de065683cddaa82d61a5e99d8b124..921d83128b5de929081ce0958e83b8e9dec0ae0e 100644 (file)
@@ -36,7 +36,8 @@ struct AVThreadMessageQueue {
 #if HAVE_THREADS
     AVFifoBuffer *fifo;
     pthread_mutex_t lock;
-    pthread_cond_t cond;
+    pthread_cond_t cond_recv;
+    pthread_cond_t cond_send;
     int err_send;
     int err_recv;
     unsigned elsize;
@@ -62,13 +63,20 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
         av_free(rmq);
         return AVERROR(ret);
     }
-    if ((ret = pthread_cond_init(&rmq->cond, NULL))) {
+    if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
+        pthread_mutex_destroy(&rmq->lock);
+        av_free(rmq);
+        return AVERROR(ret);
+    }
+    if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
+        pthread_cond_destroy(&rmq->cond_recv);
         pthread_mutex_destroy(&rmq->lock);
         av_free(rmq);
         return AVERROR(ret);
     }
     if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
-        pthread_cond_destroy(&rmq->cond);
+        pthread_cond_destroy(&rmq->cond_send);
+        pthread_cond_destroy(&rmq->cond_recv);
         pthread_mutex_destroy(&rmq->lock);
         av_free(rmq);
         return AVERROR(ret);
@@ -94,7 +102,8 @@ void av_thread_message_queue_free(AVThreadMessageQueue **mq)
     if (*mq) {
         av_thread_message_flush(*mq);
         av_fifo_freep(&(*mq)->fifo);
-        pthread_cond_destroy(&(*mq)->cond);
+        pthread_cond_destroy(&(*mq)->cond_send);
+        pthread_cond_destroy(&(*mq)->cond_recv);
         pthread_mutex_destroy(&(*mq)->lock);
         av_freep(mq);
     }
@@ -110,12 +119,13 @@ static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
     while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
         if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
             return AVERROR(EAGAIN);
-        pthread_cond_wait(&mq->cond, &mq->lock);
+        pthread_cond_wait(&mq->cond_send, &mq->lock);
     }
     if (mq->err_send)
         return mq->err_send;
     av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
-    pthread_cond_signal(&mq->cond);
+    /* one message is sent, signal one receiver */
+    pthread_cond_signal(&mq->cond_recv);
     return 0;
 }
 
@@ -126,12 +136,13 @@ static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
     while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
         if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
             return AVERROR(EAGAIN);
-        pthread_cond_wait(&mq->cond, &mq->lock);
+        pthread_cond_wait(&mq->cond_recv, &mq->lock);
     }
     if (av_fifo_size(mq->fifo) < mq->elsize)
         return mq->err_recv;
     av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
-    pthread_cond_signal(&mq->cond);
+    /* one message space appeared, signal one sender */
+    pthread_cond_signal(&mq->cond_send);
     return 0;
 }
 
@@ -175,7 +186,7 @@ void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
 #if HAVE_THREADS
     pthread_mutex_lock(&mq->lock);
     mq->err_send = err;
-    pthread_cond_broadcast(&mq->cond);
+    pthread_cond_broadcast(&mq->cond_send);
     pthread_mutex_unlock(&mq->lock);
 #endif /* HAVE_THREADS */
 }
@@ -186,16 +197,18 @@ void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
 #if HAVE_THREADS
     pthread_mutex_lock(&mq->lock);
     mq->err_recv = err;
-    pthread_cond_broadcast(&mq->cond);
+    pthread_cond_broadcast(&mq->cond_recv);
     pthread_mutex_unlock(&mq->lock);
 #endif /* HAVE_THREADS */
 }
 
+#if HAVE_THREADS
 static void free_func_wrap(void *arg, void *msg, int size)
 {
     AVThreadMessageQueue *mq = arg;
     mq->free_func(msg);
 }
+#endif
 
 void av_thread_message_flush(AVThreadMessageQueue *mq)
 {
@@ -209,7 +222,9 @@ void av_thread_message_flush(AVThreadMessageQueue *mq)
         for (off = 0; off < used; off += mq->elsize)
             av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
     av_fifo_drain(mq->fifo, used);
-    pthread_cond_broadcast(&mq->cond);
+    /* only the senders need to be notified since the queue is empty and there
+     * is nothing to read */
+    pthread_cond_broadcast(&mq->cond_send);
     pthread_mutex_unlock(&mq->lock);
 #endif /* HAVE_THREADS */
 }