int ahead;
mlt_image_format format;
mlt_deque queue;
- pthread_t ahead_thread;
+ void *ahead_thread;
pthread_mutex_t queue_mutex;
pthread_cond_t queue_cond;
pthread_mutex_t put_mutex;
}
consumer_private;
+typedef void* ( *thread_function_t )( void* );
+
static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name );
static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties );
static void on_consumer_frame_show( mlt_properties owner, mlt_consumer self, mlt_frame frame );
+static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
+static void mlt_thread_create( mlt_consumer self, thread_function_t function );
+static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
+static void mlt_thread_join( mlt_consumer self );
/** Initialize a consumer service.
*
mlt_events_register( properties, "consumer-thread-stopped", NULL );
mlt_events_register( properties, "consumer-stopping", NULL );
mlt_events_register( properties, "consumer-stopped", NULL );
+ mlt_events_register( properties, "consumer-thread-create", ( mlt_transmitter )transmit_thread_create );
+ mlt_events_register( properties, "consumer-thread-join", ( mlt_transmitter )transmit_thread_join );
mlt_events_listen( properties, self, "consumer-frame-show", ( mlt_listener )on_consumer_frame_show );
// Register a property-changed listener to handle the profile property -
pthread_cond_init( &priv->queue_cond, NULL );
// Create the read ahead
- if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
- {
- struct sched_param priority;
- priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
- pthread_attr_t thread_attributes;
- pthread_attr_init( &thread_attributes );
- pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
- pthread_attr_setschedparam( &thread_attributes, &priority );
- pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
- pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
- if ( pthread_create( &priv->ahead_thread, &thread_attributes, consumer_read_ahead_thread, self ) < 0 )
- pthread_create( &priv->ahead_thread, NULL, consumer_read_ahead_thread, self );
- pthread_attr_destroy( &thread_attributes );
- }
- else
- {
- pthread_create( &priv->ahead_thread, NULL, consumer_read_ahead_thread, self );
- }
+ mlt_thread_create( self, (thread_function_t) consumer_read_ahead_thread );
priv->started = 1;
}
pthread_mutex_unlock( &priv->put_mutex );
// Join the thread
- pthread_join( priv->ahead_thread, NULL );
+ mlt_thread_join( self );
// Destroy the frame queue mutex
pthread_mutex_destroy( &priv->queue_mutex );
{
return ( ( consumer_private* ) consumer->local )->position;
}
-
+
+static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
+{
+ if ( listener )
+ listener( owner, self,
+ (void**) args[0] /* handle */, (int*) args[1] /* priority */, (thread_function_t) args[2], (void*) args[3] /* data */ );
+}
+
+static void mlt_thread_create( mlt_consumer self, thread_function_t function )
+{
+ consumer_private *priv = self->local;
+ mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
+
+ if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
+ {
+ struct sched_param priority;
+ priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
+ if ( mlt_events_fire( properties, "consumer-thread-create",
+ &priv->ahead_thread, &priority.sched_priority, function, self, NULL ) < 1 )
+ {
+ pthread_attr_t thread_attributes;
+ pthread_attr_init( &thread_attributes );
+ pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
+ pthread_attr_setschedparam( &thread_attributes, &priority );
+ pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
+ pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
+ priv->ahead_thread = malloc( sizeof( pthread_t ) );
+ pthread_t *handle = priv->ahead_thread;
+ if ( pthread_create( ( pthread_t* ) &( *handle ), &thread_attributes, function, self ) < 0 )
+ pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
+ pthread_attr_destroy( &thread_attributes );
+ }
+ }
+ else
+ {
+ int priority = -1;
+ if ( mlt_events_fire( properties, "consumer-thread-create",
+ &priv->ahead_thread, &priority, function, self, NULL ) < 1 )
+ {
+ priv->ahead_thread = malloc( sizeof( pthread_t ) );
+ pthread_t *handle = priv->ahead_thread;
+ pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
+ }
+ }
+}
+
+static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
+{
+ if ( listener )
+ listener( owner, self, (void*) args[0] /* handle */ );
+}
+
+static void mlt_thread_join( mlt_consumer self )
+{
+ consumer_private *priv = self->local;
+ if ( mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-join", priv->ahead_thread, NULL ) < 1 )
+ {
+ pthread_t *handle = priv->ahead_thread;
+ pthread_join( *handle, NULL );
+ free( priv->ahead_thread );
+ }
+ priv->ahead_thread = NULL;
+}