pthread_mutex_lock( &self->queue_mutex );
while( self->ahead && mlt_deque_count( self->queue ) >= buffer )
pthread_cond_wait( &self->queue_cond, &self->queue_mutex );
- mlt_deque_push_back( self->queue, frame );
+ if ( self->is_purge )
+ {
+ mlt_frame_close( frame );
+ self->is_purge = 0;
+ }
+ else
+ {
+ mlt_deque_push_back( self->queue, frame );
+ }
pthread_cond_broadcast( &self->queue_cond );
pthread_mutex_unlock( &self->queue_mutex );
pthread_cond_broadcast( &self->done_cond );
pthread_mutex_unlock( &self->done_mutex );
}
- mlt_events_fire( properties, "consumer-thread-stopped" );
+ mlt_events_fire( properties, "consumer-thread-stopped", NULL );
return NULL;
}
void mlt_consumer_purge( mlt_consumer self )
{
- if ( self && self->ahead )
+ if ( self )
{
+ pthread_mutex_lock( &self->put_mutex );
+ if ( self->put ) {
+ mlt_frame_close( self->put );
+ self->put = NULL;
+ }
+ pthread_cond_broadcast( &self->put_cond );
+ pthread_mutex_unlock( &self->put_mutex );
+
if ( self->ahead && self->real_time )
pthread_mutex_lock( &self->queue_mutex );
+
+ if ( self->purge )
+ self->purge( self );
+
while ( mlt_deque_count( self->queue ) )
mlt_frame_close( mlt_deque_pop_back( self->queue ) );
- if ( self->ahead && self->real_time ) {
+ if ( self->ahead && self->real_time )
+ {
+ self->is_purge = 1;
pthread_cond_broadcast( &self->queue_cond );
pthread_mutex_unlock( &self->queue_mutex );
+ if ( abs( self->real_time ) > 1 )
+ {
+ pthread_mutex_lock( &self->done_mutex );
+ pthread_cond_broadcast( &self->done_cond );
+ pthread_mutex_unlock( &self->done_mutex );
+ }
}
+
+ pthread_mutex_lock( &self->put_mutex );
+ if ( self->put ) {
+ mlt_frame_close( self->put );
+ self->put = NULL;
+ }
+ pthread_cond_broadcast( &self->put_cond );
+ pthread_mutex_unlock( &self->put_mutex );
}
}
while ( self->ahead && mlt_deque_count( self->queue ) < buffer )
{
frame = mlt_consumer_get_frame( self );
- if ( ! frame )
- return frame;
- pthread_mutex_lock( &self->queue_mutex );
- mlt_deque_push_back( self->queue, frame );
- pthread_cond_signal( &self->queue_cond );
- pthread_mutex_unlock( &self->queue_mutex );
+ if ( frame )
+ {
+ pthread_mutex_lock( &self->queue_mutex );
+ mlt_deque_push_back( self->queue, frame );
+ pthread_cond_signal( &self->queue_cond );
+ pthread_mutex_unlock( &self->queue_mutex );
+ }
}
// Wait if not realtime.
- mlt_frame head_frame = MLT_FRAME( mlt_deque_peek_front( self->queue ) );
- while ( self->ahead && self->real_time < 0 &&
- !( head_frame && mlt_properties_get_int( MLT_FRAME_PROPERTIES( head_frame ), "rendered" ) ) )
+ while ( self->ahead && self->real_time < 0 && !self->is_purge &&
+ !( mlt_properties_get_int( MLT_FRAME_PROPERTIES( MLT_FRAME( mlt_deque_peek_front( self->queue ) ) ), "rendered" ) ) )
{
pthread_mutex_lock( &self->done_mutex );
pthread_cond_wait( &self->done_cond, &self->done_mutex );
pthread_mutex_unlock( &self->done_mutex );
}
-
+
// Get the frame from the queue.
pthread_mutex_lock( &self->queue_mutex );
frame = mlt_deque_pop_front( self->queue );
pthread_mutex_unlock( &self->queue_mutex );
+ if ( ! frame ) {
+ self->is_purge = 0;
+ return frame;
+ }
// Adapt the worker process head to the runtime conditions.
if ( self->real_time > 0 )
{
- if ( frame && mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
+ if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
{
self->consecutive_dropped = 0;
if ( self->process_head > threads && self->consecutive_rendered >= self->process_head )
}
}
}
-
+ if ( self->is_purge ) {
+ self->is_purge = 0;
+ mlt_frame_close( frame );
+ frame = NULL;
+ }
return frame;
}