- pthread_mutex_lock( &self->queue_mutex );
- while ( mlt_deque_count( self->queue ) )
- mlt_frame_close( mlt_deque_pop_back( self->queue ) );
- pthread_cond_broadcast( &self->queue_cond );
- pthread_mutex_unlock( &self->queue_mutex );
+ consumer_private *priv = self->local;
+
+ pthread_mutex_lock( &priv->put_mutex );
+ if ( priv->put ) {
+ mlt_frame_close( priv->put );
+ priv->put = NULL;
+ }
+ pthread_cond_broadcast( &priv->put_cond );
+ pthread_mutex_unlock( &priv->put_mutex );
+
+ if ( self->purge )
+ self->purge( self );
+
+ if ( priv->started && priv->real_time )
+ pthread_mutex_lock( &priv->queue_mutex );
+
+ while ( priv->started && mlt_deque_count( priv->queue ) )
+ mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
+
+ if ( priv->started && priv->real_time )
+ {
+ priv->is_purge = 1;
+ pthread_cond_broadcast( &priv->queue_cond );
+ pthread_mutex_unlock( &priv->queue_mutex );
+ if ( abs( priv->real_time ) > 1 )
+ {
+ pthread_mutex_lock( &priv->done_mutex );
+ pthread_cond_broadcast( &priv->done_cond );
+ pthread_mutex_unlock( &priv->done_mutex );
+ }
+ }
+
+ pthread_mutex_lock( &priv->put_mutex );
+ if ( priv->put ) {
+ mlt_frame_close( priv->put );
+ priv->put = NULL;
+ }
+ pthread_cond_broadcast( &priv->put_cond );
+ pthread_mutex_unlock( &priv->put_mutex );