]> git.sesse.net Git - mlt/blobdiff - src/framework/mlt_consumer.c
Reduce collisions in the mlt_properties hash function.
[mlt] / src / framework / mlt_consumer.c
index 87ba312099b093d45230c0eb1ff338dc80104fc3..202ecc5cb40e94b494bc693376446b8b7a73a185 100644 (file)
@@ -51,7 +51,7 @@ typedef struct
        int ahead;
        mlt_image_format format;
        mlt_deque queue;
-       pthread_t ahead_thread;
+       void *ahead_thread;
        pthread_mutex_t queue_mutex;
        pthread_cond_t queue_cond;
        pthread_mutex_t put_mutex;
@@ -74,11 +74,17 @@ typedef struct
 }
 consumer_private;
 
+typedef void* ( *thread_function_t )( void* );
+
 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
 static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name );
 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties );
 static void on_consumer_frame_show( mlt_properties owner, mlt_consumer self, mlt_frame frame );
+static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
+static void mlt_thread_create( mlt_consumer self, thread_function_t function );
+static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
+static void mlt_thread_join( mlt_consumer self );
 
 /** Initialize a consumer service.
  *
@@ -142,6 +148,8 @@ int mlt_consumer_init( mlt_consumer self, void *child, mlt_profile profile )
                mlt_events_register( properties, "consumer-thread-stopped", NULL );
                mlt_events_register( properties, "consumer-stopping", NULL );
                mlt_events_register( properties, "consumer-stopped", NULL );
+               mlt_events_register( properties, "consumer-thread-create", ( mlt_transmitter )transmit_thread_create );
+               mlt_events_register( properties, "consumer-thread-join", ( mlt_transmitter )transmit_thread_join );
                mlt_events_listen( properties, self, "consumer-frame-show", ( mlt_listener )on_consumer_frame_show );
 
                // Register a property-changed listener to handle the profile property -
@@ -807,6 +815,9 @@ static void *consumer_read_ahead_thread( void *arg )
                        continue;
                pos = mlt_frame_get_position( frame );
 
+               // WebVfx uses this to setup a consumer-stopping event handler.
+               mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
+
                // Increment the counter used for averaging processing cost
                count ++;
 
@@ -910,6 +921,18 @@ static void *consumer_read_ahead_thread( void *arg )
        // Remove the last frame
        mlt_frame_close( frame );
 
+       // Wipe the queue
+       pthread_mutex_lock( &priv->queue_mutex );
+       while ( mlt_deque_count( priv->queue ) )
+               mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
+
+       // Close the queue
+       mlt_deque_close( priv->queue );
+       priv->queue = NULL;
+       pthread_mutex_unlock( &priv->queue_mutex );
+
+       mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
+
        return NULL;
 }
 
@@ -1001,6 +1024,9 @@ static void *consumer_worker_thread( void *arg )
                if ( frame == NULL )
                        continue;
 
+               // WebVfx uses this to setup a consumer-stopping event handler.
+               mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
+
 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
                // All non normal playback frames should be shown
                if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
@@ -1038,6 +1064,9 @@ static void consumer_read_ahead_start( mlt_consumer self )
 {
        consumer_private *priv = self->local;
 
+       if ( priv->started )
+               return;
+
        // We're running now
        priv->ahead = 1;
 
@@ -1051,24 +1080,7 @@ static void consumer_read_ahead_start( mlt_consumer self )
        pthread_cond_init( &priv->queue_cond, NULL );
 
        // Create the read ahead
-       if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
-       {
-               struct sched_param priority;
-               priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
-               pthread_attr_t thread_attributes;
-               pthread_attr_init( &thread_attributes );
-               pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
-               pthread_attr_setschedparam( &thread_attributes, &priority );
-               pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
-               pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
-               if ( pthread_create( &priv->ahead_thread, &thread_attributes, consumer_read_ahead_thread, self ) < 0 )
-                       pthread_create( &priv->ahead_thread, NULL, consumer_read_ahead_thread, self );
-               pthread_attr_destroy( &thread_attributes );
-       }
-       else
-       {
-               pthread_create( &priv->ahead_thread, NULL, consumer_read_ahead_thread, self );
-       }
+       mlt_thread_create( self, (thread_function_t) consumer_read_ahead_thread );
        priv->started = 1;
 }
 
@@ -1082,7 +1094,12 @@ static void consumer_work_start( mlt_consumer self )
 {
        consumer_private *priv = self->local;
        int n = abs( priv->real_time );
-       pthread_t *thread = calloc( 1, sizeof( pthread_t ) * n );
+       pthread_t *thread;
+
+       if ( priv->started )
+               return;
+
+       thread = calloc( 1, sizeof( pthread_t ) * n );
 
        // We're running now
        priv->ahead = 1;
@@ -1180,22 +1197,13 @@ static void consumer_read_ahead_stop( mlt_consumer self )
                pthread_mutex_unlock( &priv->put_mutex );
 
                // Join the thread
-               pthread_join( priv->ahead_thread, NULL );
+               mlt_thread_join( self );
 
                // Destroy the frame queue mutex
                pthread_mutex_destroy( &priv->queue_mutex );
 
                // Destroy the condition
                pthread_cond_destroy( &priv->queue_cond );
-
-               // Wipe the queue
-               while ( mlt_deque_count( priv->queue ) )
-                       mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
-
-               // Close the queue
-               mlt_deque_close( priv->queue );
-
-               mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
        }
 }
 
@@ -1289,15 +1297,15 @@ void mlt_consumer_purge( mlt_consumer self )
                if ( self->purge )
                        self->purge( self );
 
-               pthread_mutex_lock( &priv->queue_mutex );
+               if ( priv->started && priv->real_time )
+                       pthread_mutex_lock( &priv->queue_mutex );
+
                while ( priv->started && mlt_deque_count( priv->queue ) )
                        mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
-               pthread_mutex_unlock( &priv->queue_mutex );
 
                if ( priv->started && priv->real_time )
                {
                        priv->is_purge = 1;
-                       pthread_mutex_lock( &priv->queue_mutex );
                        pthread_cond_broadcast( &priv->queue_cond );
                        pthread_mutex_unlock( &priv->queue_mutex );
                        if ( abs( priv->real_time ) > 1 )
@@ -1514,7 +1522,12 @@ mlt_frame mlt_consumer_rt_frame( mlt_consumer self )
 
                // This isn't true, but from the consumers perspective it is
                if ( frame != NULL )
+               {
                        mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
+
+                       // WebVfx uses this to setup a consumer-stopping event handler.
+                       mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
+               }
        }
 
        return frame;
@@ -1557,7 +1570,6 @@ int mlt_consumer_stop( mlt_consumer self )
        mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping consumer\n" );
        
        // Cancel the read ahead threads
-       priv->ahead = 0;
        if ( priv->started )
        {
                // Unblock the consumer calling mlt_consumer_rt_frame
@@ -1655,4 +1667,66 @@ mlt_position mlt_consumer_position( mlt_consumer consumer )
 {
        return ( ( consumer_private* ) consumer->local )->position;
 }
-               
+
+static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
+{
+       if ( listener )
+               listener( owner, self,
+                       (void**) args[0] /* handle */, (int*) args[1] /* priority */, (thread_function_t) args[2], (void*) args[3] /* data */ );
+}
+
+static void mlt_thread_create( mlt_consumer self, thread_function_t function )
+{
+       consumer_private *priv = self->local;
+       mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
+
+       if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
+       {
+               struct sched_param priority;
+               priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
+               if ( mlt_events_fire( properties, "consumer-thread-create",
+                    &priv->ahead_thread, &priority.sched_priority, function, self, NULL ) < 1 )
+               {
+                       pthread_attr_t thread_attributes;
+                       pthread_attr_init( &thread_attributes );
+                       pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
+                       pthread_attr_setschedparam( &thread_attributes, &priority );
+                       pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
+                       pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
+                       priv->ahead_thread = malloc( sizeof( pthread_t ) );
+                       pthread_t *handle = priv->ahead_thread;
+                       if ( pthread_create( ( pthread_t* ) &( *handle ), &thread_attributes, function, self ) < 0 )
+                               pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
+                       pthread_attr_destroy( &thread_attributes );
+               }
+       }
+       else
+       {
+               int priority = -1;
+               if ( mlt_events_fire( properties, "consumer-thread-create",
+                    &priv->ahead_thread, &priority, function, self, NULL ) < 1 )
+               {
+                       priv->ahead_thread = malloc( sizeof( pthread_t ) );
+                       pthread_t *handle = priv->ahead_thread;
+                       pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
+               }
+       }
+}
+
+static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
+{
+       if ( listener )
+               listener( owner, self, (void*) args[0] /* handle */ );
+}
+
+static void mlt_thread_join( mlt_consumer self )
+{
+       consumer_private *priv = self->local;
+       if ( mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-join", priv->ahead_thread, NULL ) < 1 )
+       {
+               pthread_t *handle = priv->ahead_thread;
+               pthread_join( *handle, NULL );
+               free( priv->ahead_thread );
+       }
+       priv->ahead_thread = NULL;
+}