]> git.sesse.net Git - mlt/commitdiff
Add work queue to the parallel consumer.
authorDan Dennedy <dan@dennedy.org>
Fri, 11 Jun 2010 07:30:59 +0000 (00:30 -0700)
committerDan Dennedy <dan@dennedy.org>
Mon, 24 Jan 2011 02:09:43 +0000 (18:09 -0800)
This removes get_frame calls from the worker threads. The get_frame call
must take a service lock and that creates contention between the
threads.

src/framework/mlt_consumer.c

index b5f07bff7adab219cbfd93357df54a5a88eb1e69..fab880c15e698e1f35d4e95724d8d6b8eecc5aad 100644 (file)
@@ -821,9 +821,6 @@ static void *consumer_worker_thread( void *arg )
        // See if audio is turned off
        int audio_off = mlt_properties_get_int( properties, "audio_off" );
 
-       // Get the maximum size of the buffer
-       int buffer = mlt_properties_get_int( properties, "buffer" );
-
        // General frame variable
        mlt_frame frame = NULL;
        uint8_t *image = NULL;
@@ -832,8 +829,13 @@ static void *consumer_worker_thread( void *arg )
        if ( preview_off && preview_format != 0 )
                this->format = preview_format;
 
-       // Get the first frame
-       frame = mlt_consumer_get_frame( this );
+       // Get the first frame from the work queue
+       pthread_mutex_lock( &this->frame_queue_mutex );
+       while( this->ahead && mlt_deque_count( this->frame_queue ) <= 0 )
+               pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
+       frame = mlt_deque_pop_front( this->frame_queue );
+       pthread_cond_signal( &this->frame_queue_cond );
+       pthread_mutex_unlock( &this->frame_queue_mutex );
 
        // Get the lock object
        lock_object = mlt_properties_get_data( MLT_FRAME_PROPERTIES( frame ), "consumer_lock_service", NULL );
@@ -872,17 +874,17 @@ static void *consumer_worker_thread( void *arg )
                {
                        pthread_mutex_lock( &this->done_queue_mutex );
                        mlt_deque_insert( this->done_queue, frame, ( mlt_deque_compare ) compare_frame_position );
+                       pthread_cond_signal( &this->done_queue_cond );
                        pthread_mutex_unlock( &this->done_queue_mutex );
-                       pthread_cond_broadcast( &this->done_queue_cond );
                }
 
-               pthread_mutex_lock( &this->done_queue_mutex );
-               while ( this->ahead && mlt_deque_count( this->done_queue ) >= buffer )
-                       pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
-               pthread_mutex_unlock( &this->done_queue_mutex );
-
-               // Get the next frame
-               frame = mlt_consumer_get_frame( this );
+               // Get the next frame from the work queue
+               pthread_mutex_lock( &this->frame_queue_mutex );
+               while( this->ahead && mlt_deque_count( this->frame_queue ) <= 0 )
+                       pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
+               frame = mlt_deque_pop_front( this->frame_queue );
+               pthread_cond_signal( &this->frame_queue_cond );
+               pthread_mutex_unlock( &this->frame_queue_mutex );
 
                // If there's no frame, we're probably stopped...
                if ( frame == NULL )
@@ -1198,31 +1200,70 @@ mlt_frame mlt_consumer_rt_frame( mlt_consumer this )
        {
                // Use multiple worker threads and a work queue
                int size = abs( this->real_time );
+               int buffer = mlt_properties_get_int( properties, "buffer" );
+               buffer = buffer < size ? size : buffer;
 
                // Start worker threads if not already
                if ( ! this->ahead )
                {
-                       int buffer = mlt_properties_get_int( properties, "buffer" );
                        int prefill = mlt_properties_get_int( properties, "prefill" );
+                       prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
+
                        consumer_work_start( this );
-                       if ( buffer > 1 )
-                               size = prefill > 0 && prefill < buffer ? prefill : buffer;
+
+                       // Fill the work queue
+                       int i = buffer;
+                       while ( i-- )
+                       {
+                               frame = mlt_consumer_get_frame( this );
+                               if ( frame )
+                               {
+                                       pthread_mutex_lock( &this->frame_queue_mutex );
+                                       mlt_deque_push_back( this->frame_queue, frame );
+                                       pthread_cond_signal( &this->frame_queue_cond );
+                                       pthread_mutex_unlock( &this->frame_queue_mutex );
+                               }
+                       }
+
+                       // Wait for prefill
+                       pthread_mutex_lock( &this->done_queue_mutex );
+                       while( this->ahead && mlt_deque_count( this->done_queue ) < prefill )
+                               pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
+                       pthread_mutex_unlock( &this->done_queue_mutex );
                }
 
-               // Get frame from the done queue
-               mlt_log_debug( MLT_CONSUMER_SERVICE(this), "size %d done count %d\n", size, mlt_deque_count( this->done_queue ) );
-               if ( this->real_time > 0 && size == this->real_time && mlt_deque_count( this->done_queue ) <= size )
+               // Try to get frame from the done queue
+               mlt_log_verbose( MLT_CONSUMER_SERVICE(this), "size %d done count %d work count %d\n",
+                       size, mlt_deque_count( this->done_queue ), mlt_deque_count( this->frame_queue ) );
+               if ( this->real_time > 0 && mlt_deque_count( this->done_queue ) <= 0 )
                {
+                       // Non-realtime and no ready frames
                        frame = mlt_consumer_get_frame( this );
                }
                else
                {
+                       // Wait for work queue space
+                       pthread_mutex_lock( &this->frame_queue_mutex );
+                       while( this->ahead && mlt_deque_count( this->frame_queue ) >= buffer )
+                               pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
+                       pthread_mutex_unlock( &this->frame_queue_mutex );
+
+                       // Feed the work queue
+                       frame = mlt_consumer_get_frame( this );
+                       if ( frame )
+                       {
+                               pthread_mutex_lock( &this->frame_queue_mutex );
+                               mlt_deque_push_back( this->frame_queue, frame );
+                               pthread_cond_signal( &this->frame_queue_cond );
+                               pthread_mutex_unlock( &this->frame_queue_mutex );
+                       }
+
+                       // Get the frame from the done queue
                        pthread_mutex_lock( &this->done_queue_mutex );
-                       while( this->ahead && mlt_deque_count( this->done_queue ) <= size )
+                       while( this->ahead && mlt_deque_count( this->done_queue ) <= 0 )
                                pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
                        frame = mlt_deque_pop_front( this->done_queue );
                        pthread_mutex_unlock( &this->done_queue_mutex );
-                       pthread_cond_signal( &this->done_queue_cond );
                }
        }
        else