]> git.sesse.net Git - mlt/commitdiff
Use a single queue for parallel workers.
authorDan Dennedy <dan@dennedy.org>
Tue, 5 Oct 2010 03:39:17 +0000 (20:39 -0700)
committerDan Dennedy <dan@dennedy.org>
Mon, 24 Jan 2011 02:09:44 +0000 (18:09 -0800)
This is a major change from the previous model of moving work items
(frames) from one queue to another. This new model improves the behavior
of realtime mode and performance overall. In the new model, a single
queue is used along with an is_processed flag on the frame. Also, there
is an index into the queue (process_head) that indicates from which
point should a worker consider fetching the next unprocessed frame.

There are situations in realtime mode where the processing of a frame
takes longer than the queue (or from head to its fetch index). Over
extended periods of this heavy processing, the video frame in the
consumer may never be updated (rendered=1)! To remedy this, the consumer
detects this and automatically moves the process_head towards the tail,
but even this may not be good enough. The only real remedy is to
increase buffers and suffer with poor latency. If lower latency is
preferred, then it may be better to not use realtime mode and permit
audio discontinuity.

src/framework/mlt_consumer.c
src/framework/mlt_consumer.h
src/framework/mlt_frame.h

index b545f336dcdb8ce3d369aab5bbd2aebcf96b64cd..a78b215c28eff1a7cdbf6e7e61ab298448bde8b9 100644 (file)
@@ -3,8 +3,9 @@
  * \brief abstraction for all consumer services
  * \see mlt_consumer_s
  *
- * Copyright (C) 2003-2009 Ushodaya Enterprises Limited
+ * Copyright (C) 2003-2010 Ushodaya Enterprises Limited
  * \author Charles Yates <charles.yates@pandora.be>
+ * \author Dan Dennedy <dan@dennedy.org>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -675,12 +676,12 @@ static void *consumer_read_ahead_thread( void *arg )
                height = mlt_properties_get_int( properties, "height" );
 
                // Put the current frame into the queue
-               pthread_mutex_lock( &this->frame_queue_mutex );
-               while( this->ahead && mlt_deque_count( this->frame_queue ) >= buffer )
-                       pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
-               mlt_deque_push_back( this->frame_queue, frame );
-               pthread_cond_broadcast( &this->frame_queue_cond );
-               pthread_mutex_unlock( &this->frame_queue_mutex );
+               pthread_mutex_lock( &this->queue_mutex );
+               while( this->ahead && mlt_deque_count( this->queue ) >= buffer )
+                       pthread_cond_wait( &this->queue_cond, &this->queue_mutex );
+               mlt_deque_push_back( this->queue, frame );
+               pthread_cond_broadcast( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );
 
                time_wait += time_difference( &ante );
 
@@ -727,7 +728,7 @@ static void *consumer_read_ahead_thread( void *arg )
                        skip_next = 0;
 
                        // If we've reached an unacceptable level, reset everything
-                       if ( skipped > 5 )
+                       if ( skipped > fps * 2 )
                        {
                                skipped = 0;
                                time_frame = 0;
@@ -748,7 +749,7 @@ static void *consumer_read_ahead_thread( void *arg )
                time_process += time_difference( &ante );
 
                // Determine if the next frame should be skipped
-               if ( mlt_deque_count( this->frame_queue ) <= 5 )
+               if ( mlt_deque_count( this->queue ) <= 5 )
                {
                        int frame_duration = mlt_properties_get_int( properties, "frame_duration" );
                        if ( ( ( time_wait + time_frame + time_process ) / count ) > frame_duration )
@@ -762,9 +763,28 @@ static void *consumer_read_ahead_thread( void *arg )
        return NULL;
 }
 
-static int compare_frame_position( mlt_frame a, mlt_frame b )
+/** Locate the first unprocessed frame in the queue.
+ *
+ * When playing with realtime behavior, we do not use the true head, but
+ * rather an adjusted process_head. The process_head is adjusted based on
+ * the rate of frame-dropping or recovery from frame-dropping. The idea is
+ * that as the level of frame-dropping increases to move the process_head
+ * closer to the tail because the frames are not completing processing prior
+ * to their playout! Then, as frames are not dropped the process_head moves
+ * back closer to the head of the queue so that worker threads can work 
+ * ahead of the playout point (queue head).
+ *
+ * \private \memberof mlt_consumer_s
+ * \param this a consumer
+ * \return an index into the queue
+ */
+
+static inline int first_unprocessed_frame( mlt_consumer this )
 {
-       return mlt_frame_get_position( a ) - mlt_frame_get_position( b );
+       int index = this->real_time <= 0 ? 0 : this->process_head;
+       while ( index < mlt_deque_count( this->queue ) && MLT_FRAME( mlt_deque_peek( this->queue, index ) )->is_processing )
+               index++;
+       return index;
 }
 
 /** The worker thread procedure for parallel processing frames.
@@ -798,71 +818,58 @@ static void *consumer_worker_thread( void *arg )
        if ( preview_off && preview_format != 0 )
                format = preview_format;
 
-       // Get the first frame from the work queue
-       pthread_mutex_lock( &this->frame_queue_mutex );
-       while( this->ahead && mlt_deque_count( this->frame_queue ) <= 0 )
-               pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
-       frame = mlt_deque_pop_front( this->frame_queue );
-       pthread_cond_signal( &this->frame_queue_cond );
-       pthread_mutex_unlock( &this->frame_queue_mutex );
-
-       // Get the image of the first frame
-       if ( !video_off )
-       {
-               mlt_events_fire( MLT_CONSUMER_PROPERTIES( this ), "consumer-frame-render", frame, NULL );
-               mlt_frame_get_image( frame, &image, &format, &width, &height, 0 );
-       }
-
-       // Mark as rendered
-       mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
-
        // Continue to read ahead
        while ( this->ahead )
        {
-               // Fetch width/height again
-               width = mlt_properties_get_int( properties, "width" );
-               height = mlt_properties_get_int( properties, "height" );
+               // Get the next unprocessed frame from the work queue
+               pthread_mutex_lock( &this->queue_mutex );
+               int index = first_unprocessed_frame( this );
+               while ( this->ahead && index >= mlt_deque_count( this->queue ) )
+               {
+                       mlt_log_debug( MLT_CONSUMER_SERVICE(this), "waiting in worker index = %d queue count = %d\n",
+                               index, mlt_deque_count( this->queue ) );
+                       pthread_cond_wait( &this->queue_cond, &this->queue_mutex );
+                       index = first_unprocessed_frame( this );
+               }
+               pthread_mutex_unlock( &this->queue_mutex );
 
-               // Put the processed frame into the done queue
+               // Mark the frame for processing
+               frame = mlt_deque_peek( this->queue, index );
                if ( frame )
                {
-                       pthread_mutex_lock( &this->done_queue_mutex );
-                       mlt_deque_insert( this->done_queue, frame, ( mlt_deque_compare ) compare_frame_position );
-                       pthread_cond_signal( &this->done_queue_cond );
-                       pthread_mutex_unlock( &this->done_queue_mutex );
+                       mlt_log_debug( MLT_CONSUMER_SERVICE(this), "worker processing index = %d frame %d queue count = %d\n",
+                               index, mlt_frame_get_position(frame), mlt_deque_count( this->queue ) );
+                       frame->is_processing = 1;
+                       mlt_properties_inc_ref( MLT_FRAME_PROPERTIES( frame ) );
                }
 
-               // Get the next frame from the work queue
-               pthread_mutex_lock( &this->frame_queue_mutex );
-               while( this->ahead && mlt_deque_count( this->frame_queue ) <= 0 )
-                       pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
-               frame = mlt_deque_pop_front( this->frame_queue );
-               pthread_cond_signal( &this->frame_queue_cond );
-               pthread_mutex_unlock( &this->frame_queue_mutex );
-
                // If there's no frame, we're probably stopped...
                if ( frame == NULL )
                        continue;
 
+#ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
                // All non normal playback frames should be shown
                if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
-               {
-#ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
                        mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
 #endif
-               }
 
                // Get the image
                if ( !video_off )
                {
+                       // Fetch width/height again
+                       width = mlt_properties_get_int( properties, "width" );
+                       height = mlt_properties_get_int( properties, "height" );
                        mlt_events_fire( MLT_CONSUMER_PROPERTIES( this ), "consumer-frame-render", frame, NULL );
                        mlt_frame_get_image( frame, &image, &format, &width, &height, 0 );
                }
+               mlt_frame_close( frame );
                mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
-       }
 
-       // Remove the last frame
-       mlt_frame_close( frame );
+               // Tell a waiting thread (non-realtime main consumer thread) that we are done.
+               pthread_mutex_lock( &this->done_mutex );
+               pthread_cond_broadcast( &this->done_cond );
+               pthread_mutex_unlock( &this->done_mutex );
+       }
 
        return NULL;
 }
@@ -879,13 +886,13 @@ static void consumer_read_ahead_start( mlt_consumer this )
        this->ahead = 1;
 
        // Create the frame queue
-       this->frame_queue = mlt_deque_init( );
+       this->queue = mlt_deque_init( );
 
-       // Create the frame_queue mutex
-       pthread_mutex_init( &this->frame_queue_mutex, NULL );
+       // Create the queue mutex
+       pthread_mutex_init( &this->queue_mutex, NULL );
 
        // Create the condition
-       pthread_cond_init( &this->frame_queue_cond, NULL );
+       pthread_cond_init( &this->queue_cond, NULL );
 
        // Create the read ahead
        if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( this ), "priority" ) )
@@ -921,19 +928,27 @@ static void consumer_work_start( mlt_consumer this )
 
        // We're running now
        this->ahead = 1;
+       
+       // These keep track of the accelleration of frame dropping or recovery.
+       this->consecutive_dropped = 0;
+       this->consecutive_rendered = 0;
+       
+       // This is the position in the queue from which to look for a frame to process.
+       // If we always start from the head, then we may likely not complete processing
+       // before the frame is played out.
+       this->process_head = 0;
 
        // Create the queues
-       this->frame_queue = mlt_deque_init();
-       this->done_queue = mlt_deque_init();
+       this->queue = mlt_deque_init();
        this->worker_threads = mlt_deque_init();
 
        // Create the mutexes
-       pthread_mutex_init( &this->frame_queue_mutex, NULL );
-       pthread_mutex_init( &this->done_queue_mutex, NULL );
+       pthread_mutex_init( &this->queue_mutex, NULL );
+       pthread_mutex_init( &this->done_mutex, NULL );
 
        // Create the conditions
-       pthread_cond_init( &this->frame_queue_cond, NULL );
-       pthread_cond_init( &this->done_queue_cond, NULL );
+       pthread_cond_init( &this->queue_cond, NULL );
+       pthread_cond_init( &this->done_cond, NULL );
 
        // Create the read ahead
        if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( this ), "priority" ) )
@@ -983,9 +998,9 @@ static void consumer_read_ahead_stop( mlt_consumer this )
                this->ahead = 0;
 
                // Broadcast to the condition in case it's waiting
-               pthread_mutex_lock( &this->frame_queue_mutex );
-               pthread_cond_broadcast( &this->frame_queue_cond );
-               pthread_mutex_unlock( &this->frame_queue_mutex );
+               pthread_mutex_lock( &this->queue_mutex );
+               pthread_cond_broadcast( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );
 
                // Broadcast to the put condition in case it's waiting
                pthread_mutex_lock( &this->put_mutex );
@@ -996,17 +1011,17 @@ static void consumer_read_ahead_stop( mlt_consumer this )
                pthread_join( this->ahead_thread, NULL );
 
                // Destroy the frame queue mutex
-               pthread_mutex_destroy( &this->frame_queue_mutex );
+               pthread_mutex_destroy( &this->queue_mutex );
 
                // Destroy the condition
-               pthread_cond_destroy( &this->frame_queue_cond );
+               pthread_cond_destroy( &this->queue_cond );
 
                // Wipe the queue
-               while ( mlt_deque_count( this->frame_queue ) )
-                       mlt_frame_close( mlt_deque_pop_back( this->frame_queue ) );
+               while ( mlt_deque_count( this->queue ) )
+                       mlt_frame_close( mlt_deque_pop_back( this->queue ) );
 
                // Close the queue
-               mlt_deque_close( this->frame_queue );
+               mlt_deque_close( this->queue );
        }
 }
 
@@ -1024,15 +1039,10 @@ static void consumer_work_stop( mlt_consumer this )
                // Inform thread to stop
                this->ahead = 0;
 
-               // Broadcast to the frame_queue condition in case it's waiting
-               pthread_mutex_lock( &this->frame_queue_mutex );
-               pthread_cond_broadcast( &this->frame_queue_cond );
-               pthread_mutex_unlock( &this->frame_queue_mutex );
-
-               // Broadcast to the done_queue condition in case it's waiting
-               pthread_mutex_lock( &this->done_queue_mutex );
-               pthread_cond_broadcast( &this->done_queue_cond );
-               pthread_mutex_unlock( &this->done_queue_mutex );
+               // Broadcast to the queue condition in case it's waiting
+               pthread_mutex_lock( &this->queue_mutex );
+               pthread_cond_broadcast( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );
 
                // Broadcast to the put condition in case it's waiting
                pthread_mutex_lock( &this->put_mutex );
@@ -1045,22 +1055,19 @@ static void consumer_work_stop( mlt_consumer this )
                        pthread_join( thread, NULL );
 
                // Destroy the mutexes
-               pthread_mutex_destroy( &this->frame_queue_mutex );
-               pthread_mutex_destroy( &this->done_queue_mutex );
+               pthread_mutex_destroy( &this->queue_mutex );
+               pthread_mutex_destroy( &this->done_mutex );
 
                // Destroy the conditions
-               pthread_cond_destroy( &this->frame_queue_cond );
-               pthread_cond_destroy( &this->done_queue_cond );
+               pthread_cond_destroy( &this->queue_cond );
+               pthread_cond_destroy( &this->done_cond );
 
                // Wipe the queues
-               while ( mlt_deque_count( this->frame_queue ) )
-                       mlt_frame_close( mlt_deque_pop_back( this->frame_queue ) );
-               while ( mlt_deque_count( this->done_queue ) )
-                       mlt_frame_close( mlt_deque_pop_back( this->done_queue ) );
+               while ( mlt_deque_count( this->queue ) )
+                       mlt_frame_close( mlt_deque_pop_back( this->queue ) );
 
                // Close the queues
-               mlt_deque_close( this->frame_queue );
-               mlt_deque_close( this->done_queue );
+               mlt_deque_close( this->queue );
                mlt_deque_close( this->worker_threads );
        }
 }
@@ -1075,21 +1082,110 @@ void mlt_consumer_purge( mlt_consumer this )
 {
        if ( this->ahead )
        {
-               pthread_mutex_lock( &this->frame_queue_mutex );
-               while ( mlt_deque_count( this->frame_queue ) )
-                       mlt_frame_close( mlt_deque_pop_back( this->frame_queue ) );
-               pthread_cond_broadcast( &this->frame_queue_cond );
-               pthread_mutex_unlock( &this->frame_queue_mutex );
+               pthread_mutex_lock( &this->queue_mutex );
+               while ( mlt_deque_count( this->queue ) )
+                       mlt_frame_close( mlt_deque_pop_back( this->queue ) );
+               pthread_cond_broadcast( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );
+       }
+}
+
+/** Use multiple worker threads and a work queue.
+ */
 
-               if ( this->done_queue )
+static mlt_frame worker_get_frame( mlt_consumer this, mlt_properties properties )
+{
+       // Frame to return
+       mlt_frame frame = NULL;
+
+       int size = abs( this->real_time );
+       int buffer = mlt_properties_get_int( properties, "buffer" );
+       // This is a heuristic to determine a suitable minimum buffer size for the number of threads.
+       int headroom = 2 + size * size;
+       buffer = buffer < headroom ? headroom : buffer;
+
+       // Start worker threads if not already started.
+       if ( ! this->ahead )
+       {
+               int prefill = mlt_properties_get_int( properties, "prefill" );
+               prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
+
+               consumer_work_start( this );
+
+               // Fill the work queue.
+               int i = buffer;
+               while ( i-- )
                {
-                       pthread_mutex_lock( &this->done_queue_mutex );
-                       while ( mlt_deque_count( this->done_queue ) )
-                               mlt_frame_close( mlt_deque_pop_back( this->done_queue ) );
-                       pthread_cond_broadcast( &this->done_queue_cond );
-                       pthread_mutex_unlock( &this->done_queue_mutex );
+                       frame = mlt_consumer_get_frame( this );
+                       if ( frame )
+                       {
+                               pthread_mutex_lock( &this->queue_mutex );
+                               mlt_deque_push_back( this->queue, frame );
+                               pthread_cond_signal( &this->queue_cond );
+                               pthread_mutex_unlock( &this->queue_mutex );
+                       }
+               }
+
+               // Wait for prefill
+               while ( this->ahead && first_unprocessed_frame( this ) < prefill )
+               {
+                       pthread_mutex_lock( &this->done_mutex );
+                       pthread_cond_wait( &this->done_cond, &this->done_mutex );
+                       pthread_mutex_unlock( &this->done_mutex );
                }
+               this->process_head = size;
+       }
+
+       mlt_log_debug( MLT_CONSUMER_SERVICE(this), "size %d done count %d work count %d process_head %d\n",
+               size, first_unprocessed_frame( this ), mlt_deque_count( this->queue ), this->process_head );
+
+       // Feed the work queue
+       frame = mlt_consumer_get_frame( this );
+       if ( ! frame )
+               return frame;
+       pthread_mutex_lock( &this->queue_mutex );
+       mlt_deque_push_back( this->queue, frame );
+       pthread_cond_signal( &this->queue_cond );
+       pthread_mutex_unlock( &this->queue_mutex );
+
+       // Wait if not realtime.
+       while( this->ahead && this->real_time < 0 &&
+              ! mlt_properties_get_int( MLT_FRAME_PROPERTIES( MLT_FRAME( mlt_deque_peek_front( this->queue ) ) ), "rendered" ) )
+       {
+               pthread_mutex_lock( &this->done_mutex );
+               pthread_cond_wait( &this->done_cond, &this->done_mutex );
+               pthread_mutex_unlock( &this->done_mutex );
        }
+       
+       // Get the frame from the queue.
+       pthread_mutex_lock( &this->queue_mutex );
+       frame = mlt_deque_pop_front( this->queue );
+       pthread_mutex_unlock( &this->queue_mutex );
+
+       // Adapt the worker process head to the runtime conditions.
+       if ( this->real_time > 0 )
+       {
+               if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
+               {
+                       this->consecutive_dropped = 0;
+                       if ( this->process_head > size && this->consecutive_rendered >= this->process_head )
+                               this->process_head--;
+                       else
+                               this->consecutive_rendered++;
+               }
+               else
+               {
+                       this->consecutive_rendered = 0;
+                       if ( this->process_head < buffer - size && this->consecutive_dropped > size )
+                               this->process_head++;
+                       else
+                               this->consecutive_dropped++;
+               }
+               mlt_log_debug( MLT_CONSUMER_SERVICE(this), "dropped %d rendered %d process_head %d\n",
+                       this->consecutive_dropped, this->consecutive_rendered, this->process_head );
+       }
+       
+       return frame;
 }
 
 /** Get the next frame from the producer connected to a consumer.
@@ -1112,7 +1208,12 @@ mlt_frame mlt_consumer_rt_frame( mlt_consumer this )
        mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
 
        // Check if the user has requested real time or not
-       if ( this->real_time == 1 || this->real_time == -1 )
+       if ( this->real_time > 1 || this->real_time < -1 )
+       {
+               // see above
+               return worker_get_frame( this, properties );
+       }
+       else if ( this->real_time == 1 || this->real_time == -1 )
        {
                int size = 1;
 
@@ -1127,84 +1228,14 @@ mlt_frame mlt_consumer_rt_frame( mlt_consumer this )
                }
 
                // Get frame from queue
-               pthread_mutex_lock( &this->frame_queue_mutex );
-               while( this->ahead && mlt_deque_count( this->frame_queue ) < size )
-                       pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
-               frame = mlt_deque_pop_front( this->frame_queue );
-               pthread_cond_broadcast( &this->frame_queue_cond );
-               pthread_mutex_unlock( &this->frame_queue_mutex );
-       }
-       else if ( this->real_time > 1 || this->real_time < -1 )
-       {
-               // Use multiple worker threads and a work queue
-               int size = abs( this->real_time );
-               int buffer = mlt_properties_get_int( properties, "buffer" );
-               buffer = buffer < size ? size : buffer;
-
-               // Start worker threads if not already
-               if ( ! this->ahead )
-               {
-                       int prefill = mlt_properties_get_int( properties, "prefill" );
-                       prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
-
-                       consumer_work_start( this );
-
-                       // Fill the work queue
-                       int i = buffer;
-                       while ( i-- )
-                       {
-                               frame = mlt_consumer_get_frame( this );
-                               if ( frame )
-                               {
-                                       pthread_mutex_lock( &this->frame_queue_mutex );
-                                       mlt_deque_push_back( this->frame_queue, frame );
-                                       pthread_cond_signal( &this->frame_queue_cond );
-                                       pthread_mutex_unlock( &this->frame_queue_mutex );
-                               }
-                       }
-
-                       // Wait for prefill
-                       pthread_mutex_lock( &this->done_queue_mutex );
-                       while( this->ahead && mlt_deque_count( this->done_queue ) < prefill )
-                               pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
-                       pthread_mutex_unlock( &this->done_queue_mutex );
-               }
-
-               // Try to get frame from the done queue
-               mlt_log_debug( MLT_CONSUMER_SERVICE(this), "size %d done count %d work count %d\n",
-                       size, mlt_deque_count( this->done_queue ), mlt_deque_count( this->frame_queue ) );
-               if ( this->real_time > 0 && size == this->real_time && mlt_deque_count( this->done_queue ) <= size )
-               {
-                       // Non-realtime and no ready frames
-                       frame = mlt_consumer_get_frame( this );
-               }
-               else
-               {
-                       // Wait for work queue space
-                       pthread_mutex_lock( &this->frame_queue_mutex );
-                       while( this->ahead && mlt_deque_count( this->frame_queue ) >= buffer )
-                               pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
-                       pthread_mutex_unlock( &this->frame_queue_mutex );
-
-                       // Feed the work queue
-                       frame = mlt_consumer_get_frame( this );
-                       if ( frame )
-                       {
-                               pthread_mutex_lock( &this->frame_queue_mutex );
-                               mlt_deque_push_back( this->frame_queue, frame );
-                               pthread_cond_signal( &this->frame_queue_cond );
-                               pthread_mutex_unlock( &this->frame_queue_mutex );
-                       }
-
-                       // Get the frame from the done queue
-                       pthread_mutex_lock( &this->done_queue_mutex );
-                       while( this->ahead && mlt_deque_count( this->done_queue ) <= size )
-                               pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
-                       frame = mlt_deque_pop_front( this->done_queue );
-                       pthread_mutex_unlock( &this->done_queue_mutex );
-               }
+               pthread_mutex_lock( &this->queue_mutex );
+               while( this->ahead && mlt_deque_count( this->queue ) < size )
+                       pthread_cond_wait( &this->queue_cond, &this->queue_mutex );
+               frame = mlt_deque_pop_front( this->queue );
+               pthread_cond_broadcast( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );
        }
-       else
+       else // real_time == 0
        {
                // Get the frame in non real time
                frame = mlt_consumer_get_frame( this );
index 8257db55464a48840f2af5adf8e592d1a6711935..ff92d122853ca537307c9d17c625dab9d2a3dc07 100644 (file)
@@ -3,8 +3,9 @@
  * \brief abstraction for all consumer services
  * \see mlt_consumer_s
  *
- * Copyright (C) 2003-2009 Ushodaya Enterprises Limited
+ * Copyright (C) 2003-2010 Ushodaya Enterprises Limited
  * \author Charles Yates <charles.yates@pandora.be>
+ * \author Dan Dennedy <dan@dennedy.org>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -103,10 +104,10 @@ struct mlt_consumer_s
        int real_time;
        int ahead;
        mlt_image_format format;
-       mlt_deque frame_queue;
+       mlt_deque queue;
        pthread_t ahead_thread;
-       pthread_mutex_t frame_queue_mutex;
-       pthread_cond_t frame_queue_cond;
+       pthread_mutex_t queue_mutex;
+       pthread_cond_t queue_cond;
        pthread_mutex_t put_mutex;
        pthread_cond_t put_cond;
        mlt_frame put;
@@ -114,10 +115,13 @@ struct mlt_consumer_s
        mlt_event event_listener;
        mlt_position position;
 
+       /* additional fields added for the parallel work queue */
        mlt_deque worker_threads;
-       mlt_deque done_queue;
-       pthread_mutex_t done_queue_mutex;
-       pthread_cond_t done_queue_cond;
+       pthread_mutex_t done_mutex;
+       pthread_cond_t done_cond;
+       int consecutive_dropped;
+       int consecutive_rendered;
+       int process_head;
 };
 
 #define MLT_CONSUMER_SERVICE( consumer )       ( &( consumer )->parent )
index 6aefb3803c87140c3103c043bf32d306281992f4..1495c1d7be2d944dafb39155c5e44d912a414026 100644 (file)
@@ -92,6 +92,7 @@ struct mlt_frame_s
        mlt_deque stack_image;   /**< \private the image processing stack of operations and data */
        mlt_deque stack_audio;   /**< \private the audio processing stack of operations and data */
        mlt_deque stack_service; /**< \private a general purpose data stack */
+       int is_processing;       /**< \private indicates if a frame is or was processed by the parallel consumer */
 };
 
 #define MLT_FRAME_PROPERTIES( frame )          ( &( frame )->parent )