]> git.sesse.net Git - mlt/commitdiff
Fix deadlocks in sdl_preview with parallel-consumer.
authorDan Dennedy <dan@dennedy.org>
Mon, 14 Feb 2011 06:21:21 +0000 (22:21 -0800)
committerDan Dennedy <dan@dennedy.org>
Mon, 14 Feb 2011 06:21:21 +0000 (22:21 -0800)
src/framework/mlt_consumer.c
src/framework/mlt_consumer.h
src/modules/sdl/consumer_sdl_preview.c

index eaebb4aa66714b3c2879bd7fe6f1c1abd7b33423..1c17fd5a82172b3c1fc1ea216b19249fd301a96c 100644 (file)
@@ -401,7 +401,6 @@ int mlt_consumer_start( mlt_consumer this )
        char *test_card = mlt_properties_get( properties, "test_card" );
 
        // Just to make sure nothing is hanging around...
-       mlt_frame_close( this->put );
        this->put = NULL;
        this->put_active = 1;
 
@@ -913,6 +912,7 @@ static void consumer_read_ahead_start( mlt_consumer this )
        {
                pthread_create( &this->ahead_thread, NULL, consumer_read_ahead_thread, this );
        }
+       this->started = 1;
 }
 
 /** Start the worker threads.
@@ -981,6 +981,7 @@ static void consumer_work_start( mlt_consumer this )
                                mlt_deque_push_back( this->worker_threads, thread );
                }
        }
+       this->started = 1;
 }
 
 /** Stop the read/render thread.
@@ -992,7 +993,7 @@ static void consumer_work_start( mlt_consumer this )
 static void consumer_read_ahead_stop( mlt_consumer this )
 {
        // Make sure we're running
-       if ( this->ahead )
+       if ( this->started )
        {
                // Inform thread to stop
                this->ahead = 0;
@@ -1009,6 +1010,7 @@ static void consumer_read_ahead_stop( mlt_consumer this )
 
                // Join the thread
                pthread_join( this->ahead_thread, NULL );
+               this->started = 0;
 
                // Destroy the frame queue mutex
                pthread_mutex_destroy( &this->queue_mutex );
@@ -1034,7 +1036,7 @@ static void consumer_read_ahead_stop( mlt_consumer this )
 static void consumer_work_stop( mlt_consumer this )
 {
        // Make sure we're running
-       if ( this->ahead )
+       if ( this->started )
        {
                // Inform thread to stop
                this->ahead = 0;
@@ -1049,10 +1051,16 @@ static void consumer_work_stop( mlt_consumer this )
                pthread_cond_broadcast( &this->put_cond );
                pthread_mutex_unlock( &this->put_mutex );
 
+               // Broadcast to the done condition in case it's waiting
+               pthread_mutex_lock( &this->done_mutex );
+               pthread_cond_broadcast( &this->done_cond );
+               pthread_mutex_unlock( &this->done_mutex );
+
                // Join the threads
                pthread_t *thread;
                while ( ( thread = mlt_deque_pop_front( this->worker_threads ) ) )
                        pthread_join( *thread, NULL );
+               this->started = 0;
 
                // Destroy the mutexes
                pthread_mutex_destroy( &this->queue_mutex );
@@ -1114,7 +1122,7 @@ static mlt_frame worker_get_frame( mlt_consumer this, mlt_properties properties
 
                // Fill the work queue.
                int i = buffer;
-               while ( i-- )
+               while ( this->ahead && i-- )
                {
                        frame = mlt_consumer_get_frame( this );
                        if ( frame )
@@ -1136,17 +1144,20 @@ static mlt_frame worker_get_frame( mlt_consumer this, mlt_properties properties
                this->process_head = size;
        }
 
-       mlt_log_debug( MLT_CONSUMER_SERVICE(this), "size %d done count %d work count %d process_head %d\n",
-               size, first_unprocessed_frame( this ), mlt_deque_count( this->queue ), this->process_head );
+//     mlt_log_verbose( MLT_CONSUMER_SERVICE(this), "size %d done count %d work count %d process_head %d\n",
+//             size, first_unprocessed_frame( this ), mlt_deque_count( this->queue ), this->process_head );
 
        // Feed the work queue
-       frame = mlt_consumer_get_frame( this );
-       if ( ! frame )
-               return frame;
-       pthread_mutex_lock( &this->queue_mutex );
-       mlt_deque_push_back( this->queue, frame );
-       pthread_cond_signal( &this->queue_cond );
-       pthread_mutex_unlock( &this->queue_mutex );
+       while ( this->ahead && mlt_deque_count( this->queue ) < buffer )
+       {
+               frame = mlt_consumer_get_frame( this );
+               if ( ! frame )
+                       return frame;
+               pthread_mutex_lock( &this->queue_mutex );
+               mlt_deque_push_back( this->queue, frame );
+               pthread_cond_signal( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );
+       }
 
        // Wait if not realtime.
        while( this->ahead && this->real_time < 0 &&
@@ -1181,8 +1192,8 @@ static mlt_frame worker_get_frame( mlt_consumer this, mlt_properties properties
                        else
                                this->consecutive_dropped++;
                }
-               mlt_log_debug( MLT_CONSUMER_SERVICE(this), "dropped %d rendered %d process_head %d\n",
-                       this->consecutive_dropped, this->consecutive_rendered, this->process_head );
+//             mlt_log_verbose( MLT_CONSUMER_SERVICE(this), "dropped %d rendered %d process_head %d\n",
+//                     this->consecutive_dropped, this->consecutive_rendered, this->process_head );
        }
        
        return frame;
@@ -1282,6 +1293,18 @@ int mlt_consumer_stop( mlt_consumer this )
 
        // Stop the consumer
        mlt_log( MLT_CONSUMER_SERVICE( this ), MLT_LOG_DEBUG, "stopping consumer\n" );
+       
+       // Cancel the read ahead threads
+       this->ahead = 0;
+       if ( this->started )
+       {
+               // Unblock the consumer calling mlt_consumer_rt_frame
+               pthread_mutex_lock( &this->queue_mutex );
+               pthread_cond_broadcast( &this->queue_cond );
+               pthread_mutex_unlock( &this->queue_mutex );             
+       }
+       
+       // Invoke the child callback
        if ( this->stop != NULL )
                this->stop( this );
 
index ff92d122853ca537307c9d17c625dab9d2a3dc07..e6e27aa8307fa8c3190bcdf9f1c769f0562f041a 100644 (file)
@@ -122,6 +122,7 @@ struct mlt_consumer_s
        int consecutive_dropped;
        int consecutive_rendered;
        int process_head;
+       int started;
 };
 
 #define MLT_CONSUMER_SERVICE( consumer )       ( &( consumer )->parent )
index 51859ba6e13d799ed731af1001bd9c68b760332b..77cd9ea2bebf931d74b42d63c905706c9fe4d52a 100644 (file)
 #include <framework/mlt_frame.h>
 #include <framework/mlt_factory.h>
 #include <framework/mlt_producer.h>
+#include <framework/mlt_log.h>
 #include <stdlib.h>
 #include <string.h>
 #include <pthread.h>
 #include <SDL.h>
 #include <SDL_syswm.h>
-#include <assert.h>
+#include <sys/time.h>
 
 extern pthread_mutex_t mlt_sdl_mutex;
 
@@ -346,6 +347,7 @@ static void *consumer_thread( void *arg )
                                mlt_position duration = mlt_producer_get_playtime( producer );
                                int pause = 0;
 
+#ifndef SKIP_WAIT_EOS
                                if ( this->active == this->play )
                                {
                                        // Do not interrupt the play consumer near the end
@@ -360,10 +362,11 @@ static void *consumer_thread( void *arg )
                                        }
                                        else
                                        {
-                                               // Send frame with speed 0 once to stop it
-                                               if ( frame && !eos && speed == 0.0 )
+                                               // Send frame with speed 0 to stop it
+                                               if ( frame && !mlt_consumer_is_stopped( this->play ) )
                                                {
                                                        mlt_consumer_put_frame( this->play, frame );
+                                                       frame = NULL;
                                                        eos = 1;
                                                }
 
@@ -371,11 +374,21 @@ static void *consumer_thread( void *arg )
                                                if ( mlt_consumer_is_stopped( this->play ) )
                                                {
                                                        // Stream has ended
+                                                       mlt_log_verbose( MLT_CONSUMER_SERVICE( consumer ), "END OF STREAM\n" );
                                                        pause = 1;
-                                                       eos = 0; // reset eof indicator
+                                                       eos = 0; // reset eos indicator
+                                               }
+                                               else
+                                               {
+                                                       // Prevent a tight busy loop
+                                                       struct timespec tm = { 0, 100000L }; // 100 usec
+                                                       nanosleep( &tm, NULL );
                                                }
                                        }
                                }
+#else
+                               pause = this->active == this->play;
+#endif
                                if ( pause )
                                {
                                        // Start the still consumer