int ahead;
mlt_image_format format;
mlt_deque queue;
- pthread_t ahead_thread;
+ void *ahead_thread;
pthread_mutex_t queue_mutex;
pthread_cond_t queue_cond;
pthread_mutex_t put_mutex;
}
consumer_private;
+typedef void* ( *thread_function_t )( void* );
+
static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name );
static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties );
static void on_consumer_frame_show( mlt_properties owner, mlt_consumer self, mlt_frame frame );
+static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
+static void mlt_thread_create( mlt_consumer self, thread_function_t function );
+static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
+static void mlt_thread_join( mlt_consumer self );
/** Initialize a consumer service.
*
mlt_events_register( properties, "consumer-thread-stopped", NULL );
mlt_events_register( properties, "consumer-stopping", NULL );
mlt_events_register( properties, "consumer-stopped", NULL );
+ mlt_events_register( properties, "consumer-thread-create", ( mlt_transmitter )transmit_thread_create );
+ mlt_events_register( properties, "consumer-thread-join", ( mlt_transmitter )transmit_thread_join );
mlt_events_listen( properties, self, "consumer-frame-show", ( mlt_listener )on_consumer_frame_show );
// Register a property-changed listener to handle the profile property -
continue;
pos = mlt_frame_get_position( frame );
+ // WebVfx uses this to setup a consumer-stopping event handler.
+ mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
+
// Increment the counter used for averaging processing cost
count ++;
// Remove the last frame
mlt_frame_close( frame );
+ // Wipe the queue
+ pthread_mutex_lock( &priv->queue_mutex );
+ while ( mlt_deque_count( priv->queue ) )
+ mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
+
+ // Close the queue
+ mlt_deque_close( priv->queue );
+ priv->queue = NULL;
+ pthread_mutex_unlock( &priv->queue_mutex );
+
+ mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
+
return NULL;
}
if ( frame == NULL )
continue;
+ // WebVfx uses this to setup a consumer-stopping event handler.
+ mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
+
#ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
// All non normal playback frames should be shown
if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
{
consumer_private *priv = self->local;
+ if ( priv->started )
+ return;
+
// We're running now
priv->ahead = 1;
pthread_cond_init( &priv->queue_cond, NULL );
// Create the read ahead
- if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
- {
- struct sched_param priority;
- priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
- pthread_attr_t thread_attributes;
- pthread_attr_init( &thread_attributes );
- pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
- pthread_attr_setschedparam( &thread_attributes, &priority );
- pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
- pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
- if ( pthread_create( &priv->ahead_thread, &thread_attributes, consumer_read_ahead_thread, self ) < 0 )
- pthread_create( &priv->ahead_thread, NULL, consumer_read_ahead_thread, self );
- pthread_attr_destroy( &thread_attributes );
- }
- else
- {
- pthread_create( &priv->ahead_thread, NULL, consumer_read_ahead_thread, self );
- }
+ mlt_thread_create( self, (thread_function_t) consumer_read_ahead_thread );
priv->started = 1;
}
{
consumer_private *priv = self->local;
int n = abs( priv->real_time );
- pthread_t *thread = calloc( 1, sizeof( pthread_t ) * n );
+ pthread_t *thread;
+
+ if ( priv->started )
+ return;
+
+ thread = calloc( 1, sizeof( pthread_t ) * n );
// We're running now
priv->ahead = 1;
pthread_mutex_unlock( &priv->put_mutex );
// Join the thread
- pthread_join( priv->ahead_thread, NULL );
+ mlt_thread_join( self );
// Destroy the frame queue mutex
pthread_mutex_destroy( &priv->queue_mutex );
// Destroy the condition
pthread_cond_destroy( &priv->queue_cond );
-
- // Wipe the queue
- while ( mlt_deque_count( priv->queue ) )
- mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
-
- // Close the queue
- mlt_deque_close( priv->queue );
-
- mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
}
}
if ( self->purge )
self->purge( self );
- pthread_mutex_lock( &priv->queue_mutex );
+ if ( priv->started && priv->real_time )
+ pthread_mutex_lock( &priv->queue_mutex );
+
while ( priv->started && mlt_deque_count( priv->queue ) )
mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
- pthread_mutex_unlock( &priv->queue_mutex );
if ( priv->started && priv->real_time )
{
priv->is_purge = 1;
- pthread_mutex_lock( &priv->queue_mutex );
pthread_cond_broadcast( &priv->queue_cond );
pthread_mutex_unlock( &priv->queue_mutex );
if ( abs( priv->real_time ) > 1 )
// This isn't true, but from the consumers perspective it is
if ( frame != NULL )
+ {
mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
+
+ // WebVfx uses this to setup a consumer-stopping event handler.
+ mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
+ }
}
return frame;
mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping consumer\n" );
// Cancel the read ahead threads
- priv->ahead = 0;
if ( priv->started )
{
// Unblock the consumer calling mlt_consumer_rt_frame
{
return ( ( consumer_private* ) consumer->local )->position;
}
-
+
+static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
+{
+ if ( listener )
+ listener( owner, self,
+ (void**) args[0] /* handle */, (int*) args[1] /* priority */, (thread_function_t) args[2], (void*) args[3] /* data */ );
+}
+
+static void mlt_thread_create( mlt_consumer self, thread_function_t function )
+{
+ consumer_private *priv = self->local;
+ mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
+
+ if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
+ {
+ struct sched_param priority;
+ priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
+ if ( mlt_events_fire( properties, "consumer-thread-create",
+ &priv->ahead_thread, &priority.sched_priority, function, self, NULL ) < 1 )
+ {
+ pthread_attr_t thread_attributes;
+ pthread_attr_init( &thread_attributes );
+ pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
+ pthread_attr_setschedparam( &thread_attributes, &priority );
+ pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
+ pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
+ priv->ahead_thread = malloc( sizeof( pthread_t ) );
+ pthread_t *handle = priv->ahead_thread;
+ if ( pthread_create( ( pthread_t* ) &( *handle ), &thread_attributes, function, self ) < 0 )
+ pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
+ pthread_attr_destroy( &thread_attributes );
+ }
+ }
+ else
+ {
+ int priority = -1;
+ if ( mlt_events_fire( properties, "consumer-thread-create",
+ &priv->ahead_thread, &priority, function, self, NULL ) < 1 )
+ {
+ priv->ahead_thread = malloc( sizeof( pthread_t ) );
+ pthread_t *handle = priv->ahead_thread;
+ pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
+ }
+ }
+}
+
+static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
+{
+ if ( listener )
+ listener( owner, self, (void*) args[0] /* handle */ );
+}
+
+static void mlt_thread_join( mlt_consumer self )
+{
+ consumer_private *priv = self->local;
+ if ( mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-join", priv->ahead_thread, NULL ) < 1 )
+ {
+ pthread_t *handle = priv->ahead_thread;
+ pthread_join( *handle, NULL );
+ free( priv->ahead_thread );
+ }
+ priv->ahead_thread = NULL;
+}