// See if audio is turned off
int audio_off = mlt_properties_get_int( properties, "audio_off" );
- // Get the maximum size of the buffer
- int buffer = mlt_properties_get_int( properties, "buffer" );
-
// General frame variable
mlt_frame frame = NULL;
uint8_t *image = NULL;
if ( preview_off && preview_format != 0 )
this->format = preview_format;
- // Get the first frame
- frame = mlt_consumer_get_frame( this );
+ // Get the first frame from the work queue
+ pthread_mutex_lock( &this->frame_queue_mutex );
+ while( this->ahead && mlt_deque_count( this->frame_queue ) <= 0 )
+ pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
+ frame = mlt_deque_pop_front( this->frame_queue );
+ pthread_cond_signal( &this->frame_queue_cond );
+ pthread_mutex_unlock( &this->frame_queue_mutex );
// Get the lock object
lock_object = mlt_properties_get_data( MLT_FRAME_PROPERTIES( frame ), "consumer_lock_service", NULL );
{
pthread_mutex_lock( &this->done_queue_mutex );
mlt_deque_insert( this->done_queue, frame, ( mlt_deque_compare ) compare_frame_position );
+ pthread_cond_signal( &this->done_queue_cond );
pthread_mutex_unlock( &this->done_queue_mutex );
- pthread_cond_broadcast( &this->done_queue_cond );
}
- pthread_mutex_lock( &this->done_queue_mutex );
- while ( this->ahead && mlt_deque_count( this->done_queue ) >= buffer )
- pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
- pthread_mutex_unlock( &this->done_queue_mutex );
-
- // Get the next frame
- frame = mlt_consumer_get_frame( this );
+ // Get the next frame from the work queue
+ pthread_mutex_lock( &this->frame_queue_mutex );
+ while( this->ahead && mlt_deque_count( this->frame_queue ) <= 0 )
+ pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
+ frame = mlt_deque_pop_front( this->frame_queue );
+ pthread_cond_signal( &this->frame_queue_cond );
+ pthread_mutex_unlock( &this->frame_queue_mutex );
// If there's no frame, we're probably stopped...
if ( frame == NULL )
{
// Use multiple worker threads and a work queue
int size = abs( this->real_time );
+ int buffer = mlt_properties_get_int( properties, "buffer" );
+ buffer = buffer < size ? size : buffer;
// Start worker threads if not already
if ( ! this->ahead )
{
- int buffer = mlt_properties_get_int( properties, "buffer" );
int prefill = mlt_properties_get_int( properties, "prefill" );
+ prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
+
consumer_work_start( this );
- if ( buffer > 1 )
- size = prefill > 0 && prefill < buffer ? prefill : buffer;
+
+ // Fill the work queue
+ int i = buffer;
+ while ( i-- )
+ {
+ frame = mlt_consumer_get_frame( this );
+ if ( frame )
+ {
+ pthread_mutex_lock( &this->frame_queue_mutex );
+ mlt_deque_push_back( this->frame_queue, frame );
+ pthread_cond_signal( &this->frame_queue_cond );
+ pthread_mutex_unlock( &this->frame_queue_mutex );
+ }
+ }
+
+ // Wait for prefill
+ pthread_mutex_lock( &this->done_queue_mutex );
+ while( this->ahead && mlt_deque_count( this->done_queue ) < prefill )
+ pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
+ pthread_mutex_unlock( &this->done_queue_mutex );
}
- // Get frame from the done queue
- mlt_log_debug( MLT_CONSUMER_SERVICE(this), "size %d done count %d\n", size, mlt_deque_count( this->done_queue ) );
- if ( this->real_time > 0 && size == this->real_time && mlt_deque_count( this->done_queue ) <= size )
+ // Try to get frame from the done queue
+ mlt_log_verbose( MLT_CONSUMER_SERVICE(this), "size %d done count %d work count %d\n",
+ size, mlt_deque_count( this->done_queue ), mlt_deque_count( this->frame_queue ) );
+ if ( this->real_time > 0 && mlt_deque_count( this->done_queue ) <= 0 )
{
+ // Non-realtime and no ready frames
frame = mlt_consumer_get_frame( this );
}
else
{
+ // Wait for work queue space
+ pthread_mutex_lock( &this->frame_queue_mutex );
+ while( this->ahead && mlt_deque_count( this->frame_queue ) >= buffer )
+ pthread_cond_wait( &this->frame_queue_cond, &this->frame_queue_mutex );
+ pthread_mutex_unlock( &this->frame_queue_mutex );
+
+ // Feed the work queue
+ frame = mlt_consumer_get_frame( this );
+ if ( frame )
+ {
+ pthread_mutex_lock( &this->frame_queue_mutex );
+ mlt_deque_push_back( this->frame_queue, frame );
+ pthread_cond_signal( &this->frame_queue_cond );
+ pthread_mutex_unlock( &this->frame_queue_mutex );
+ }
+
+ // Get the frame from the done queue
pthread_mutex_lock( &this->done_queue_mutex );
- while( this->ahead && mlt_deque_count( this->done_queue ) <= size )
+ while( this->ahead && mlt_deque_count( this->done_queue ) <= 0 )
pthread_cond_wait( &this->done_queue_cond, &this->done_queue_mutex );
frame = mlt_deque_pop_front( this->done_queue );
pthread_mutex_unlock( &this->done_queue_mutex );
- pthread_cond_signal( &this->done_queue_cond );
}
}
else