]> git.sesse.net Git - mlt/blobdiff - src/framework/mlt_consumer.c
Make mlt_consumer_purge() more thorough. (SF-187)
[mlt] / src / framework / mlt_consumer.c
index 3421cb67f2d7e6a44d61fdbbc670ddf2292ffefb..866ebf2b75047e3f5cbc6e048dd386198cc2b01f 100644 (file)
@@ -745,7 +745,15 @@ static void *consumer_read_ahead_thread( void *arg )
                pthread_mutex_lock( &self->queue_mutex );
                while( self->ahead && mlt_deque_count( self->queue ) >= buffer )
                        pthread_cond_wait( &self->queue_cond, &self->queue_mutex );
-               mlt_deque_push_back( self->queue, frame );
+               if ( self->is_purge )
+               {
+                       mlt_frame_close( frame );
+                       self->is_purge = 0;
+               }
+               else
+               {
+                       mlt_deque_push_back( self->queue, frame );
+               }
                pthread_cond_broadcast( &self->queue_cond );
                pthread_mutex_unlock( &self->queue_mutex );
 
@@ -973,7 +981,7 @@ static void *consumer_worker_thread( void *arg )
                pthread_cond_broadcast( &self->done_cond );
                pthread_mutex_unlock( &self->done_mutex );
        }
-       mlt_events_fire( properties, "consumer-thread-stopped" );
+       mlt_events_fire( properties, "consumer-thread-stopped", NULL );
 
        return NULL;
 }
@@ -1202,16 +1210,44 @@ static void consumer_work_stop( mlt_consumer self )
 
 void mlt_consumer_purge( mlt_consumer self )
 {
-       if ( self && self->ahead )
+       if ( self )
        {
+               pthread_mutex_lock( &self->put_mutex );
+               if ( self->put ) {
+                       mlt_frame_close( self->put );
+                       self->put = NULL;
+               }
+               pthread_cond_broadcast( &self->put_cond );
+               pthread_mutex_unlock( &self->put_mutex );
+
                if ( self->ahead && self->real_time )
                        pthread_mutex_lock( &self->queue_mutex );
+
+               if ( self->purge )
+                       self->purge( self );
+
                while ( mlt_deque_count( self->queue ) )
                        mlt_frame_close( mlt_deque_pop_back( self->queue ) );
-               if ( self->ahead && self->real_time ) {
+               if ( self->ahead && self->real_time )
+               {
+                       self->is_purge = 1;
                        pthread_cond_broadcast( &self->queue_cond );
                        pthread_mutex_unlock( &self->queue_mutex );
+                       if ( abs( self->real_time ) > 1 )
+                       {
+                               pthread_mutex_lock( &self->done_mutex );
+                               pthread_cond_broadcast( &self->done_cond );
+                               pthread_mutex_unlock( &self->done_mutex );
+                       }
                }
+
+               pthread_mutex_lock( &self->put_mutex );
+               if ( self->put ) {
+                       mlt_frame_close( self->put );
+                       self->put = NULL;
+               }
+               pthread_cond_broadcast( &self->put_cond );
+               pthread_mutex_unlock( &self->put_mutex );
        }
 }
 
@@ -1270,33 +1306,37 @@ static mlt_frame worker_get_frame( mlt_consumer self, mlt_properties properties
        while ( self->ahead && mlt_deque_count( self->queue ) < buffer )
        {
                frame = mlt_consumer_get_frame( self );
-               if ( ! frame )
-                       return frame;
-               pthread_mutex_lock( &self->queue_mutex );
-               mlt_deque_push_back( self->queue, frame );
-               pthread_cond_signal( &self->queue_cond );
-               pthread_mutex_unlock( &self->queue_mutex );
+               if ( frame )
+               {
+                       pthread_mutex_lock( &self->queue_mutex );
+                       mlt_deque_push_back( self->queue, frame );
+                       pthread_cond_signal( &self->queue_cond );
+                       pthread_mutex_unlock( &self->queue_mutex );
+               }
        }
 
        // Wait if not realtime.
-       mlt_frame head_frame = MLT_FRAME( mlt_deque_peek_front( self->queue ) );
-       while ( self->ahead && self->real_time < 0 &&
-               !( head_frame && mlt_properties_get_int( MLT_FRAME_PROPERTIES( head_frame ), "rendered" ) ) )
+       while ( self->ahead && self->real_time < 0 && !self->is_purge &&
+               !( mlt_properties_get_int( MLT_FRAME_PROPERTIES( MLT_FRAME( mlt_deque_peek_front( self->queue ) ) ), "rendered" ) ) )
        {
                pthread_mutex_lock( &self->done_mutex );
                pthread_cond_wait( &self->done_cond, &self->done_mutex );
                pthread_mutex_unlock( &self->done_mutex );
        }
-       
+
        // Get the frame from the queue.
        pthread_mutex_lock( &self->queue_mutex );
        frame = mlt_deque_pop_front( self->queue );
        pthread_mutex_unlock( &self->queue_mutex );
+       if ( ! frame ) {
+               self->is_purge = 0;
+               return frame;
+       }
 
        // Adapt the worker process head to the runtime conditions.
        if ( self->real_time > 0 )
        {
-               if ( frame && mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
+               if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
                {
                        self->consecutive_dropped = 0;
                        if ( self->process_head > threads && self->consecutive_rendered >= self->process_head )
@@ -1339,7 +1379,11 @@ static mlt_frame worker_get_frame( mlt_consumer self, mlt_properties properties
                        }
                }
        }
-       
+       if ( self->is_purge ) {
+               self->is_purge = 0;
+               mlt_frame_close( frame );
+               frame = NULL;
+       }
        return frame;
 }