3 * \brief abstraction for all consumer services
6 * Copyright (C) 2003-2010 Ushodaya Enterprises Limited
7 * \author Charles Yates <charles.yates@pandora.be>
8 * \author Dan Dennedy <dan@dennedy.org>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #include "mlt_consumer.h"
26 #include "mlt_factory.h"
27 #include "mlt_producer.h"
28 #include "mlt_frame.h"
29 #include "mlt_profile.h"
37 /** Define this if you want an automatic deinterlace (if necessary) when the
38 * consumer's producer is not running at normal speed.
40 #undef DEINTERLACE_ON_NOT_NORMAL_SPEED
42 /** This is not the ideal place for this, but it is needed by VDPAU as well.
44 pthread_mutex_t mlt_sdl_mutex = PTHREAD_MUTEX_INITIALIZER;
46 /** \brief private members of mlt_consumer */
52 mlt_image_format format;
55 pthread_mutex_t queue_mutex;
56 pthread_cond_t queue_cond;
57 pthread_mutex_t put_mutex;
58 pthread_cond_t put_cond;
61 mlt_event event_listener;
62 mlt_position position;
65 /* additional fields added for the parallel work queue */
66 mlt_deque worker_threads;
67 pthread_mutex_t done_mutex;
68 pthread_cond_t done_cond;
69 int consecutive_dropped;
70 int consecutive_rendered;
73 pthread_t *threads; /**< used to deallocate all threads */
77 typedef void* ( *thread_function_t )( void* );
79 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
80 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
81 static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name );
82 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties );
83 static void on_consumer_frame_show( mlt_properties owner, mlt_consumer self, mlt_frame frame );
84 static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
85 static void mlt_thread_create( mlt_consumer self, thread_function_t function );
86 static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
87 static void mlt_thread_join( mlt_consumer self );
89 /** Initialize a consumer service.
91 * \public \memberof mlt_consumer_s
92 * \param self the consumer to initialize
93 * \param child a pointer to the object for the subclass
94 * \param profile the \p mlt_profile_s to use (optional but recommended,
95 * uses the environment variable MLT if self is NULL)
96 * \return true if there was an error
99 int mlt_consumer_init( mlt_consumer self, void *child, mlt_profile profile )
102 memset( self, 0, sizeof( struct mlt_consumer_s ) );
104 consumer_private *priv = self->local = calloc( 1, sizeof( consumer_private ) );
106 error = mlt_service_init( &self->parent, self );
109 // Get the properties from the service
110 mlt_properties properties = MLT_SERVICE_PROPERTIES( &self->parent );
112 // Apply profile to properties
113 if ( profile == NULL )
115 // Normally the application creates the profile and controls its lifetime
116 // This is the fallback exception handling
117 profile = mlt_profile_init( NULL );
118 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
119 mlt_properties_set_data( properties, "_profile", profile, 0, (mlt_destructor)mlt_profile_close, NULL );
121 apply_profile_properties( self, profile, properties );
123 // Default rescaler for all consumers
124 mlt_properties_set( properties, "rescale", "bilinear" );
126 // Default read ahead buffer size
127 mlt_properties_set_int( properties, "buffer", 25 );
128 mlt_properties_set_int( properties, "drop_max", 5 );
130 // Default audio frequency and channels
131 mlt_properties_set_int( properties, "frequency", 48000 );
132 mlt_properties_set_int( properties, "channels", 2 );
134 // Default of all consumers is real time
135 mlt_properties_set_int( properties, "real_time", 1 );
137 // Default to environment test card
138 mlt_properties_set( properties, "test_card", mlt_environment( "MLT_TEST_CARD" ) );
140 // Hmm - default all consumers to yuv422 :-/
141 priv->format = mlt_image_yuv422;
142 mlt_properties_set( properties, "mlt_image_format", mlt_image_format_name( priv->format ) );
143 mlt_properties_set( properties, "mlt_audio_format", mlt_audio_format_name( mlt_audio_s16 ) );
145 mlt_events_register( properties, "consumer-frame-show", ( mlt_transmitter )mlt_consumer_frame_show );
146 mlt_events_register( properties, "consumer-frame-render", ( mlt_transmitter )mlt_consumer_frame_render );
147 mlt_events_register( properties, "consumer-thread-started", NULL );
148 mlt_events_register( properties, "consumer-thread-stopped", NULL );
149 mlt_events_register( properties, "consumer-stopping", NULL );
150 mlt_events_register( properties, "consumer-stopped", NULL );
151 mlt_events_register( properties, "consumer-thread-create", ( mlt_transmitter )transmit_thread_create );
152 mlt_events_register( properties, "consumer-thread-join", ( mlt_transmitter )transmit_thread_join );
153 mlt_events_listen( properties, self, "consumer-frame-show", ( mlt_listener )on_consumer_frame_show );
155 // Register a property-changed listener to handle the profile property -
156 // subsequent properties can override the profile
157 priv->event_listener = mlt_events_listen( properties, self, "property-changed", ( mlt_listener )mlt_consumer_property_changed );
159 // Create the push mutex and condition
160 pthread_mutex_init( &priv->put_mutex, NULL );
161 pthread_cond_init( &priv->put_cond, NULL );
167 /** Convert the profile into properties on the consumer.
169 * \private \memberof mlt_consumer_s
170 * \param self a consumer
171 * \param profile a profile
172 * \param properties a properties list (typically, the consumer's)
175 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties )
177 consumer_private *priv = self->local;
178 mlt_event_block( priv->event_listener );
179 mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
180 mlt_properties_set_int( properties, "frame_rate_num", profile->frame_rate_num );
181 mlt_properties_set_int( properties, "frame_rate_den", profile->frame_rate_den );
182 mlt_properties_set_int( properties, "width", profile->width );
183 mlt_properties_set_int( properties, "height", profile->height );
184 mlt_properties_set_int( properties, "progressive", profile->progressive );
185 mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile ) );
186 mlt_properties_set_int( properties, "sample_aspect_num", profile->sample_aspect_num );
187 mlt_properties_set_int( properties, "sample_aspect_den", profile->sample_aspect_den );
188 mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile ) );
189 mlt_properties_set_int( properties, "display_aspect_num", profile->display_aspect_num );
190 mlt_properties_set_int( properties, "display_aspect_den", profile->display_aspect_den );
191 mlt_properties_set_int( properties, "colorspace", profile->colorspace );
192 mlt_event_unblock( priv->event_listener );
195 /** The property-changed event listener
197 * \private \memberof mlt_consumer_s
198 * \param owner the events object
199 * \param self the consumer
200 * \param name the name of the property that changed
203 static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name )
205 if ( !strcmp( name, "mlt_profile" ) )
208 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
210 // Get the current profile
211 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
213 // Load the new profile
214 mlt_profile new_profile = mlt_profile_init( mlt_properties_get( properties, name ) );
219 if ( profile != NULL )
221 free( profile->description );
222 memcpy( profile, new_profile, sizeof( struct mlt_profile_s ) );
223 profile->description = strdup( new_profile->description );
227 profile = new_profile;
230 // Apply to properties
231 apply_profile_properties( self, profile, properties );
232 mlt_profile_close( new_profile );
235 else if ( !strcmp( name, "frame_rate_num" ) )
237 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
238 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
241 profile->frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
242 mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
245 else if ( !strcmp( name, "frame_rate_den" ) )
247 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
248 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
251 profile->frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
252 mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
255 else if ( !strcmp( name, "width" ) )
257 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
258 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
260 profile->width = mlt_properties_get_int( properties, "width" );
262 else if ( !strcmp( name, "height" ) )
264 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
265 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
267 profile->height = mlt_properties_get_int( properties, "height" );
269 else if ( !strcmp( name, "progressive" ) )
271 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
272 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
274 profile->progressive = mlt_properties_get_int( properties, "progressive" );
276 else if ( !strcmp( name, "sample_aspect_num" ) )
278 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
279 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
282 profile->sample_aspect_num = mlt_properties_get_int( properties, "sample_aspect_num" );
283 mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile ) );
286 else if ( !strcmp( name, "sample_aspect_den" ) )
288 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
289 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
292 profile->sample_aspect_den = mlt_properties_get_int( properties, "sample_aspect_den" );
293 mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile ) );
296 else if ( !strcmp( name, "display_aspect_num" ) )
298 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
299 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
302 profile->display_aspect_num = mlt_properties_get_int( properties, "display_aspect_num" );
303 mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile ) );
306 else if ( !strcmp( name, "display_aspect_den" ) )
308 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
309 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
312 profile->display_aspect_den = mlt_properties_get_int( properties, "display_aspect_den" );
313 mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile ) );
316 else if ( !strcmp( name, "colorspace" ) )
318 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
319 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
321 profile->colorspace = mlt_properties_get_int( properties, "colorspace" );
325 /** The transmitter for the consumer-frame-show event
327 * Invokes the listener.
329 * \private \memberof mlt_consumer_s
330 * \param listener a function pointer that will be invoked
331 * \param owner the events object that will be passed to \p listener
332 * \param self a service that will be passed to \p listener
333 * \param args an array of pointers - the first entry is passed as a frame to \p listener
336 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
338 if ( listener != NULL )
339 listener( owner, self, ( mlt_frame )args[ 0 ] );
342 /** The transmitter for the consumer-frame-render event
344 * Invokes the listener.
346 * \private \memberof mlt_consumer_s
347 * \param listener a function pointer that will be invoked
348 * \param owner the events object that will be passed to \p listener
349 * \param self a service that will be passed to \p listener
350 * \param args an array of pointers - the first entry is passed as a frame to \p listener
353 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
355 if ( listener != NULL )
356 listener( owner, self, ( mlt_frame )args[ 0 ] );
359 /** A listener on the consumer-frame-show event
361 * Saves the position of the frame shown.
363 * \private \memberof mlt_consumer_s
364 * \param owner the events object
365 * \param consumer the consumer on which this event occurred
366 * \param frame the frame that was shown
369 static void on_consumer_frame_show( mlt_properties owner, mlt_consumer consumer, mlt_frame frame )
372 ( ( consumer_private*) consumer->local )->position = mlt_frame_get_position( frame );
375 /** Create a new consumer.
377 * \public \memberof mlt_consumer_s
378 * \param profile a profile (optional, but recommended)
379 * \return a new consumer
382 mlt_consumer mlt_consumer_new( mlt_profile profile )
384 // Create the memory for the structure
385 mlt_consumer self = malloc( sizeof( struct mlt_consumer_s ) );
388 if ( self != NULL && mlt_consumer_init( self, NULL, profile ) == 0 )
400 /** Get the parent service object.
402 * \public \memberof mlt_consumer_s
403 * \param self a consumer
404 * \return the parent service class
405 * \see MLT_CONSUMER_SERVICE
408 mlt_service mlt_consumer_service( mlt_consumer self )
410 return self != NULL ? &self->parent : NULL;
413 /** Get the consumer properties.
415 * \public \memberof mlt_consumer_s
416 * \param self a consumer
417 * \return the consumer's properties list
418 * \see MLT_CONSUMER_PROPERTIES
421 mlt_properties mlt_consumer_properties( mlt_consumer self )
423 return self != NULL ? MLT_SERVICE_PROPERTIES( &self->parent ) : NULL;
426 /** Connect the consumer to the producer.
428 * \public \memberof mlt_consumer_s
429 * \param self a consumer
430 * \param producer a producer
431 * \return > 0 warning, == 0 success, < 0 serious error,
432 * 1 = this service does not accept input,
433 * 2 = the producer is invalid,
434 * 3 = the producer is already registered with this consumer
437 int mlt_consumer_connect( mlt_consumer self, mlt_service producer )
439 return mlt_service_connect_producer( &self->parent, producer, 0 );
442 /** Start the consumer.
444 * \public \memberof mlt_consumer_s
445 * \param self a consumer
446 * \return true if there was an error
449 int mlt_consumer_start( mlt_consumer self )
451 if ( !mlt_consumer_is_stopped( self ) )
454 consumer_private *priv = self->local;
456 // Stop listening to the property-changed event
457 mlt_event_block( priv->event_listener );
460 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
462 // Determine if there's a test card producer
463 char *test_card = mlt_properties_get( properties, "test_card" );
465 // Just to make sure nothing is hanging around...
466 pthread_mutex_lock( &priv->put_mutex );
468 priv->put_active = 1;
469 pthread_mutex_unlock( &priv->put_mutex );
472 if ( test_card != NULL )
474 if ( mlt_properties_get_data( properties, "test_card_producer", NULL ) == NULL )
476 // Create a test card producer
477 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
478 mlt_producer producer = mlt_factory_producer( profile, NULL, test_card );
480 // Do we have a producer
481 if ( producer != NULL )
483 // Test card should loop I guess...
484 mlt_properties_set( MLT_PRODUCER_PROPERTIES( producer ), "eof", "loop" );
485 //mlt_producer_set_speed( producer, 0 );
486 //mlt_producer_set_in_and_out( producer, 0, 0 );
488 // Set the test card on the consumer
489 mlt_properties_set_data( properties, "test_card_producer", producer, 0, ( mlt_destructor )mlt_producer_close, NULL );
495 // Allow the hash table to speed things up
496 mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
499 // The profile could have changed between a stop and a restart.
500 apply_profile_properties( self, mlt_service_profile( MLT_CONSUMER_SERVICE(self) ), properties );
502 // Set the frame duration in microseconds for the frame-dropping heuristic
503 int frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
504 int frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
505 int frame_duration = 0;
507 if ( frame_rate_num && frame_rate_den )
509 frame_duration = 1000000 / frame_rate_num * frame_rate_den;
512 mlt_properties_set_int( properties, "frame_duration", frame_duration );
514 // Check and run an ante command
515 if ( mlt_properties_get( properties, "ante" ) )
516 if ( system( mlt_properties_get( properties, "ante" ) ) == -1 )
517 mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_ERROR, "system(%s) failed!\n", mlt_properties_get( properties, "ante" ) );
519 // Set the real_time preference
520 priv->real_time = mlt_properties_get_int( properties, "real_time" );
522 // For worker threads implementation, buffer must be at least # threads
523 if ( abs( priv->real_time ) > 1 && mlt_properties_get_int( properties, "buffer" ) <= abs( priv->real_time ) )
524 mlt_properties_set_int( properties, "_buffer", abs( priv->real_time ) + 1 );
526 // Get the image format to use for rendering threads
527 const char* format = mlt_properties_get( properties, "mlt_image_format" );
530 if ( !strcmp( format, "rgb24" ) )
531 priv->format = mlt_image_rgb24;
532 else if ( !strcmp( format, "rgb24a" ) )
533 priv->format = mlt_image_rgb24a;
534 else if ( !strcmp( format, "yuv420p" ) )
535 priv->format = mlt_image_yuv420p;
536 else if ( !strcmp( format, "none" ) )
537 priv->format = mlt_image_none;
538 else if ( !strcmp( format, "glsl" ) )
539 priv->format = mlt_image_glsl_texture;
541 priv->format = mlt_image_yuv422;
545 if ( self->start != NULL )
546 return self->start( self );
551 /** An alternative method to feed frames into the consumer.
553 * Only valid if the consumer itself is not connected.
555 * \public \memberof mlt_consumer_s
556 * \param self a consumer
557 * \param frame a frame
558 * \return true (ignore self for now)
561 int mlt_consumer_put_frame( mlt_consumer self, mlt_frame frame )
565 // Get the service assoicated to the consumer
566 mlt_service service = MLT_CONSUMER_SERVICE( self );
568 if ( mlt_service_producer( service ) == NULL )
572 consumer_private *priv = self->local;
574 pthread_mutex_lock( &priv->put_mutex );
575 while ( priv->put_active && priv->put != NULL )
577 gettimeofday( &now, NULL );
578 tm.tv_sec = now.tv_sec + 1;
579 tm.tv_nsec = now.tv_usec * 1000;
580 pthread_cond_timedwait( &priv->put_cond, &priv->put_mutex, &tm );
582 if ( priv->put_active && priv->put == NULL )
585 mlt_frame_close( frame );
586 pthread_cond_broadcast( &priv->put_cond );
587 pthread_mutex_unlock( &priv->put_mutex );
591 mlt_frame_close( frame );
597 /** Protected method for consumer to get frames from connected service
599 * \public \memberof mlt_consumer_s
600 * \param self a consumer
604 mlt_frame mlt_consumer_get_frame( mlt_consumer self )
607 mlt_frame frame = NULL;
609 // Get the service assoicated to the consumer
610 mlt_service service = MLT_CONSUMER_SERVICE( self );
612 // Get the consumer properties
613 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
616 if ( mlt_service_producer( service ) == NULL && mlt_properties_get_int( properties, "put_mode" ) )
620 consumer_private *priv = self->local;
622 pthread_mutex_lock( &priv->put_mutex );
623 while ( priv->put_active && priv->put == NULL )
625 gettimeofday( &now, NULL );
626 tm.tv_sec = now.tv_sec + 1;
627 tm.tv_nsec = now.tv_usec * 1000;
628 pthread_cond_timedwait( &priv->put_cond, &priv->put_mutex, &tm );
632 pthread_cond_broadcast( &priv->put_cond );
633 pthread_mutex_unlock( &priv->put_mutex );
635 mlt_service_apply_filters( service, frame, 0 );
637 else if ( mlt_service_producer( service ) != NULL )
639 mlt_service_get_frame( service, &frame, 0 );
643 frame = mlt_frame_init( service );
648 // Get the frame properties
649 mlt_properties frame_properties = MLT_FRAME_PROPERTIES( frame );
651 // Get the test card producer
652 mlt_producer test_card = mlt_properties_get_data( properties, "test_card_producer", NULL );
654 // Attach the test frame producer to it.
655 if ( test_card != NULL )
656 mlt_properties_set_data( frame_properties, "test_card_producer", test_card, 0, NULL, NULL );
658 // Pass along the interpolation and deinterlace options
659 // TODO: get rid of consumer_deinterlace and use profile.progressive
660 mlt_properties_set( frame_properties, "rescale.interp", mlt_properties_get( properties, "rescale" ) );
661 mlt_properties_set_int( frame_properties, "consumer_deinterlace", mlt_properties_get_int( properties, "progressive" ) | mlt_properties_get_int( properties, "deinterlace" ) );
662 mlt_properties_set( frame_properties, "deinterlace_method", mlt_properties_get( properties, "deinterlace_method" ) );
663 mlt_properties_set_int( frame_properties, "consumer_tff", mlt_properties_get_int( properties, "top_field_first" ) );
670 /** Compute the time difference between now and a time value.
672 * \private \memberof mlt_consumer_s
673 * \param time1 a time value to be compared against now
674 * \return the difference in microseconds
677 static inline long time_difference( struct timeval *time1 )
679 struct timeval time2;
680 time2.tv_sec = time1->tv_sec;
681 time2.tv_usec = time1->tv_usec;
682 gettimeofday( time1, NULL );
683 return time1->tv_sec * 1000000 + time1->tv_usec - time2.tv_sec * 1000000 - time2.tv_usec;
686 /** The thread procedure for asynchronously pulling frames through the service
687 * network connected to a consumer.
689 * \private \memberof mlt_consumer_s
690 * \param arg a consumer
693 static void *consumer_read_ahead_thread( void *arg )
695 // The argument is the consumer
696 mlt_consumer self = arg;
697 consumer_private *priv = self->local;
699 // Get the properties of the consumer
700 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
702 // Get the width and height
703 int width = mlt_properties_get_int( properties, "width" );
704 int height = mlt_properties_get_int( properties, "height" );
706 // See if video is turned off
707 int video_off = mlt_properties_get_int( properties, "video_off" );
708 int preview_off = mlt_properties_get_int( properties, "preview_off" );
709 int preview_format = mlt_properties_get_int( properties, "preview_format" );
711 // Get the audio settings
712 mlt_audio_format afmt = mlt_audio_s16;
713 const char *format = mlt_properties_get( properties, "mlt_audio_format" );
716 if ( !strcmp( format, "none" ) )
717 afmt = mlt_audio_none;
718 else if ( !strcmp( format, "s32" ) )
719 afmt = mlt_audio_s32;
720 else if ( !strcmp( format, "s32le" ) )
721 afmt = mlt_audio_s32le;
722 else if ( !strcmp( format, "float" ) )
723 afmt = mlt_audio_float;
724 else if ( !strcmp( format, "f32le" ) )
725 afmt = mlt_audio_f32le;
726 else if ( !strcmp( format, "u8" ) )
730 double fps = mlt_properties_get_double( properties, "fps" );
731 int channels = mlt_properties_get_int( properties, "channels" );
732 int frequency = mlt_properties_get_int( properties, "frequency" );
736 // See if audio is turned off
737 int audio_off = mlt_properties_get_int( properties, "audio_off" );
739 // Get the maximum size of the buffer
740 int buffer = mlt_properties_get_int( properties, "buffer" ) + 1;
742 // General frame variable
743 mlt_frame frame = NULL;
744 uint8_t *image = NULL;
749 // Average time for get_frame and get_image
752 int64_t time_process = 0;
754 mlt_position pos = 0;
755 mlt_position start_pos = 0;
756 mlt_position last_pos = 0;
757 int frame_duration = mlt_properties_get_int( properties, "frame_duration" );
758 int drop_max = mlt_properties_get_int( properties, "drop_max" );
760 if ( preview_off && preview_format != 0 )
761 priv->format = preview_format;
763 mlt_events_fire( properties, "consumer-thread-started", NULL );
765 // Get the first frame
766 frame = mlt_consumer_get_frame( self );
770 // Get the image of the first frame
773 mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", frame, NULL );
774 mlt_frame_get_image( frame, &image, &priv->format, &width, &height, 0 );
779 samples = mlt_sample_calculator( fps, frequency, counter++ );
780 mlt_frame_get_audio( frame, &audio, &afmt, &frequency, &channels, &samples );
784 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
785 last_pos = start_pos = pos = mlt_frame_get_position( frame );
788 // Get the starting time (can ignore the times above)
789 gettimeofday( &ante, NULL );
791 // Continue to read ahead
792 while ( priv->ahead )
794 // Put the current frame into the queue
795 pthread_mutex_lock( &priv->queue_mutex );
796 while( priv->ahead && mlt_deque_count( priv->queue ) >= buffer )
797 pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
798 if ( priv->is_purge )
800 mlt_frame_close( frame );
805 mlt_deque_push_back( priv->queue, frame );
807 pthread_cond_broadcast( &priv->queue_cond );
808 pthread_mutex_unlock( &priv->queue_mutex );
810 // Get the next frame
811 frame = mlt_consumer_get_frame( self );
813 // If there's no frame, we're probably stopped...
816 pos = mlt_frame_get_position( frame );
818 // WebVfx uses this to setup a consumer-stopping event handler.
819 mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
821 // Increment the counter used for averaging processing cost
824 // All non-normal playback frames should be shown
825 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
827 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
828 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
830 // Indicate seeking or trick-play
834 // If skip flag not set or frame-dropping disabled
835 if ( !skip_next || priv->real_time == -1 )
839 // Reset width/height - could have been changed by previous mlt_frame_get_image
840 width = mlt_properties_get_int( properties, "width" );
841 height = mlt_properties_get_int( properties, "height" );
844 mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", frame, NULL );
845 mlt_frame_get_image( frame, &image, &priv->format, &width, &height, 0 );
848 // Indicate the rendered image is available.
849 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
851 // Reset consecutively-skipped counter
854 else // Skip image processing
856 // Increment the number of consecutively-skipped frames
859 // If too many (1 sec) consecutively-skipped frames
860 if ( skipped > drop_max )
862 // Reset cost tracker
865 mlt_log_verbose( self, "too many frames dropped - forcing next frame\n" );
869 // Always process audio
872 samples = mlt_sample_calculator( fps, frequency, counter++ );
873 mlt_frame_get_audio( frame, &audio, &afmt, &frequency, &channels, &samples );
876 // Get the time to process this frame
877 int64_t time_current = time_difference( &ante );
879 // If the current time is not suddenly some large amount
880 if ( time_current < time_process / count * 20 || !time_process || count < 5 )
882 // Accumulate the cost for processing this frame
883 time_process += time_current;
887 mlt_log_debug( self, "current %"PRId64" threshold %"PRId64" count %d\n",
888 time_current, (int64_t) (time_process / count * 20), count );
889 // Ignore the cost of this frame's time
893 // Determine if we started, resumed, or seeked
894 if ( pos != last_pos + 1 )
898 // Do not skip the first 20% of buffer at start, resume, or seek
899 if ( pos - start_pos <= buffer / 5 + 1 )
901 // Reset cost tracker
909 // Only consider skipping if the buffer level is low (or really small)
910 if ( mlt_deque_count( priv->queue ) <= buffer / 5 + 1 )
912 // Skip next frame if average cost exceeds frame duration.
913 if ( time_process / count > frame_duration )
916 mlt_log_debug( self, "avg usec %"PRId64" (%"PRId64"/%d) duration %d\n",
917 time_process/count, time_process, count, frame_duration);
921 // Remove the last frame
922 mlt_frame_close( frame );
925 pthread_mutex_lock( &priv->queue_mutex );
926 while ( mlt_deque_count( priv->queue ) )
927 mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
930 mlt_deque_close( priv->queue );
932 pthread_mutex_unlock( &priv->queue_mutex );
934 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
939 /** Locate the first unprocessed frame in the queue.
941 * When playing with realtime behavior, we do not use the true head, but
942 * rather an adjusted process_head. The process_head is adjusted based on
943 * the rate of frame-dropping or recovery from frame-dropping. The idea is
944 * that as the level of frame-dropping increases to move the process_head
945 * closer to the tail because the frames are not completing processing prior
946 * to their playout! Then, as frames are not dropped the process_head moves
947 * back closer to the head of the queue so that worker threads can work
948 * ahead of the playout point (queue head).
950 * \private \memberof mlt_consumer_s
951 * \param self a consumer
952 * \return an index into the queue
955 static inline int first_unprocessed_frame( mlt_consumer self )
957 consumer_private *priv = self->local;
958 int index = priv->real_time <= 0 ? 0 : priv->process_head;
959 while ( index < mlt_deque_count( priv->queue ) && MLT_FRAME( mlt_deque_peek( priv->queue, index ) )->is_processing )
964 /** The worker thread procedure for parallel processing frames.
966 * \private \memberof mlt_consumer_s
967 * \param arg a consumer
970 static void *consumer_worker_thread( void *arg )
972 // The argument is the consumer
973 mlt_consumer self = arg;
974 consumer_private *priv = self->local;
976 // Get the properties of the consumer
977 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
979 // Get the width and height
980 int width = mlt_properties_get_int( properties, "width" );
981 int height = mlt_properties_get_int( properties, "height" );
982 mlt_image_format format = priv->format;
984 // See if video is turned off
985 int video_off = mlt_properties_get_int( properties, "video_off" );
986 int preview_off = mlt_properties_get_int( properties, "preview_off" );
987 int preview_format = mlt_properties_get_int( properties, "preview_format" );
989 // General frame variable
990 mlt_frame frame = NULL;
991 uint8_t *image = NULL;
993 if ( preview_off && preview_format != 0 )
994 format = preview_format;
996 mlt_events_fire( properties, "consumer-thread-started", NULL );
998 // Continue to read ahead
999 while ( priv->ahead )
1001 // Get the next unprocessed frame from the work queue
1002 pthread_mutex_lock( &priv->queue_mutex );
1003 int index = first_unprocessed_frame( self );
1004 while ( priv->ahead && index >= mlt_deque_count( priv->queue ) )
1006 mlt_log_debug( MLT_CONSUMER_SERVICE(self), "waiting in worker index = %d queue count = %d\n",
1007 index, mlt_deque_count( priv->queue ) );
1008 pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
1009 index = first_unprocessed_frame( self );
1012 // Mark the frame for processing
1013 frame = mlt_deque_peek( priv->queue, index );
1016 mlt_log_debug( MLT_CONSUMER_SERVICE(self), "worker processing index = %d frame " MLT_POSITION_FMT " queue count = %d\n",
1017 index, mlt_frame_get_position(frame), mlt_deque_count( priv->queue ) );
1018 frame->is_processing = 1;
1019 mlt_properties_inc_ref( MLT_FRAME_PROPERTIES( frame ) );
1021 pthread_mutex_unlock( &priv->queue_mutex );
1023 // If there's no frame, we're probably stopped...
1024 if ( frame == NULL )
1027 // WebVfx uses this to setup a consumer-stopping event handler.
1028 mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
1030 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
1031 // All non normal playback frames should be shown
1032 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
1033 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
1039 // Fetch width/height again
1040 width = mlt_properties_get_int( properties, "width" );
1041 height = mlt_properties_get_int( properties, "height" );
1042 mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", frame, NULL );
1043 mlt_frame_get_image( frame, &image, &format, &width, &height, 0 );
1045 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1046 mlt_frame_close( frame );
1048 // Tell a waiting thread (non-realtime main consumer thread) that we are done.
1049 pthread_mutex_lock( &priv->done_mutex );
1050 pthread_cond_broadcast( &priv->done_cond );
1051 pthread_mutex_unlock( &priv->done_mutex );
1057 /** Start the read/render thread.
1059 * \private \memberof mlt_consumer_s
1060 * \param self a consumer
1063 static void consumer_read_ahead_start( mlt_consumer self )
1065 consumer_private *priv = self->local;
1067 if ( priv->started )
1070 // We're running now
1073 // Create the frame queue
1074 priv->queue = mlt_deque_init( );
1076 // Create the queue mutex
1077 pthread_mutex_init( &priv->queue_mutex, NULL );
1079 // Create the condition
1080 pthread_cond_init( &priv->queue_cond, NULL );
1082 // Create the read ahead
1083 mlt_thread_create( self, (thread_function_t) consumer_read_ahead_thread );
1087 /** Start the worker threads.
1089 * \private \memberof mlt_consumer_s
1090 * \param self a consumer
1093 static void consumer_work_start( mlt_consumer self )
1095 consumer_private *priv = self->local;
1096 int n = abs( priv->real_time );
1099 if ( priv->started )
1102 thread = calloc( 1, sizeof( pthread_t ) * n );
1104 // We're running now
1106 priv->threads = thread;
1108 // These keep track of the accelleration of frame dropping or recovery.
1109 priv->consecutive_dropped = 0;
1110 priv->consecutive_rendered = 0;
1112 // This is the position in the queue from which to look for a frame to process.
1113 // If we always start from the head, then we may likely not complete processing
1114 // before the frame is played out.
1115 priv->process_head = 0;
1117 // Create the queues
1118 priv->queue = mlt_deque_init();
1119 priv->worker_threads = mlt_deque_init();
1121 // Create the mutexes
1122 pthread_mutex_init( &priv->queue_mutex, NULL );
1123 pthread_mutex_init( &priv->done_mutex, NULL );
1125 // Create the conditions
1126 pthread_cond_init( &priv->queue_cond, NULL );
1127 pthread_cond_init( &priv->done_cond, NULL );
1129 // Create the read ahead
1130 if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
1133 struct sched_param priority;
1134 pthread_attr_t thread_attributes;
1136 priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
1137 pthread_attr_init( &thread_attributes );
1138 pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
1139 pthread_attr_setschedparam( &thread_attributes, &priority );
1140 pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
1141 pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
1145 if ( pthread_create( thread, &thread_attributes, consumer_worker_thread, self ) < 0 )
1146 if ( pthread_create( thread, NULL, consumer_worker_thread, self ) == 0 )
1147 mlt_deque_push_back( priv->worker_threads, thread );
1150 pthread_attr_destroy( &thread_attributes );
1157 if ( pthread_create( thread, NULL, consumer_worker_thread, self ) == 0 )
1158 mlt_deque_push_back( priv->worker_threads, thread );
1165 /** Stop the read/render thread.
1167 * \private \memberof mlt_consumer_s
1168 * \param self a consumer
1171 static void consumer_read_ahead_stop( mlt_consumer self )
1173 consumer_private *priv = self->local;
1175 // Make sure we're running
1176 // TODO improve support for atomic ops in general (see libavutil/atomic.h)
1177 #ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
1178 if ( __sync_val_compare_and_swap( &priv->started, 1, 0 ) )
1181 if ( priv->started )
1185 // Inform thread to stop
1187 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-stopping", NULL );
1189 // Broadcast to the condition in case it's waiting
1190 pthread_mutex_lock( &priv->queue_mutex );
1191 pthread_cond_broadcast( &priv->queue_cond );
1192 pthread_mutex_unlock( &priv->queue_mutex );
1194 // Broadcast to the put condition in case it's waiting
1195 pthread_mutex_lock( &priv->put_mutex );
1196 pthread_cond_broadcast( &priv->put_cond );
1197 pthread_mutex_unlock( &priv->put_mutex );
1200 mlt_thread_join( self );
1202 // Destroy the frame queue mutex
1203 pthread_mutex_destroy( &priv->queue_mutex );
1205 // Destroy the condition
1206 pthread_cond_destroy( &priv->queue_cond );
1210 /** Stop the worker threads.
1212 * \private \memberof mlt_consumer_s
1213 * \param self a consumer
1216 static void consumer_work_stop( mlt_consumer self )
1218 consumer_private *priv = self->local;
1220 // Make sure we're running
1221 #ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
1222 if ( __sync_val_compare_and_swap( &priv->started, 1, 0 ) )
1225 if ( priv->started )
1229 // Inform thread to stop
1231 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-stopping", NULL );
1233 // Broadcast to the queue condition in case it's waiting
1234 pthread_mutex_lock( &priv->queue_mutex );
1235 pthread_cond_broadcast( &priv->queue_cond );
1236 pthread_mutex_unlock( &priv->queue_mutex );
1238 // Broadcast to the put condition in case it's waiting
1239 pthread_mutex_lock( &priv->put_mutex );
1240 pthread_cond_broadcast( &priv->put_cond );
1241 pthread_mutex_unlock( &priv->put_mutex );
1243 // Broadcast to the done condition in case it's waiting
1244 pthread_mutex_lock( &priv->done_mutex );
1245 pthread_cond_broadcast( &priv->done_cond );
1246 pthread_mutex_unlock( &priv->done_mutex );
1250 while ( ( thread = mlt_deque_pop_back( priv->worker_threads ) ) )
1251 pthread_join( *thread, NULL );
1253 // Deallocate the array of threads
1254 if ( priv->threads )
1255 free( priv->threads );
1257 // Destroy the mutexes
1258 pthread_mutex_destroy( &priv->queue_mutex );
1259 pthread_mutex_destroy( &priv->done_mutex );
1261 // Destroy the conditions
1262 pthread_cond_destroy( &priv->queue_cond );
1263 pthread_cond_destroy( &priv->done_cond );
1266 while ( mlt_deque_count( priv->queue ) )
1267 mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
1270 mlt_deque_close( priv->queue );
1271 mlt_deque_close( priv->worker_threads );
1273 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
1277 /** Flush the read/render thread's buffer.
1279 * \public \memberof mlt_consumer_s
1280 * \param self a consumer
1283 void mlt_consumer_purge( mlt_consumer self )
1287 consumer_private *priv = self->local;
1289 pthread_mutex_lock( &priv->put_mutex );
1291 mlt_frame_close( priv->put );
1294 pthread_cond_broadcast( &priv->put_cond );
1295 pthread_mutex_unlock( &priv->put_mutex );
1298 self->purge( self );
1300 if ( priv->started && priv->real_time )
1301 pthread_mutex_lock( &priv->queue_mutex );
1303 while ( priv->started && mlt_deque_count( priv->queue ) )
1304 mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
1306 if ( priv->started && priv->real_time )
1309 pthread_cond_broadcast( &priv->queue_cond );
1310 pthread_mutex_unlock( &priv->queue_mutex );
1311 if ( abs( priv->real_time ) > 1 )
1313 pthread_mutex_lock( &priv->done_mutex );
1314 pthread_cond_broadcast( &priv->done_cond );
1315 pthread_mutex_unlock( &priv->done_mutex );
1319 pthread_mutex_lock( &priv->put_mutex );
1321 mlt_frame_close( priv->put );
1324 pthread_cond_broadcast( &priv->put_cond );
1325 pthread_mutex_unlock( &priv->put_mutex );
1329 /** Use multiple worker threads and a work queue.
1332 static mlt_frame worker_get_frame( mlt_consumer self, mlt_properties properties )
1335 mlt_frame frame = NULL;
1336 consumer_private *priv = self->local;
1337 double fps = mlt_properties_get_double( properties, "fps" );
1338 int threads = abs( priv->real_time );
1339 int buffer = mlt_properties_get_int( properties, "_buffer" );
1340 buffer = buffer > 0 ? buffer : mlt_properties_get_int( properties, "buffer" );
1341 // This is a heuristic to determine a suitable minimum buffer size for the number of threads.
1342 int headroom = 2 + threads * threads;
1343 buffer = buffer < headroom ? headroom : buffer;
1345 // Start worker threads if not already started.
1346 if ( ! priv->ahead )
1348 int prefill = mlt_properties_get_int( properties, "prefill" );
1349 prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
1351 consumer_work_start( self );
1353 // Fill the work queue.
1355 while ( priv->ahead && i-- )
1357 frame = mlt_consumer_get_frame( self );
1360 pthread_mutex_lock( &priv->queue_mutex );
1361 mlt_deque_push_back( priv->queue, frame );
1362 pthread_cond_signal( &priv->queue_cond );
1363 pthread_mutex_unlock( &priv->queue_mutex );
1368 while ( priv->ahead && first_unprocessed_frame( self ) < prefill )
1370 pthread_mutex_lock( &priv->done_mutex );
1371 pthread_cond_wait( &priv->done_cond, &priv->done_mutex );
1372 pthread_mutex_unlock( &priv->done_mutex );
1374 priv->process_head = threads;
1377 // mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "size %d done count %d work count %d process_head %d\n",
1378 // threads, first_unprocessed_frame( self ), mlt_deque_count( priv->queue ), priv->process_head );
1380 // Feed the work queue
1381 while ( priv->ahead && mlt_deque_count( priv->queue ) < buffer )
1383 frame = mlt_consumer_get_frame( self );
1386 pthread_mutex_lock( &priv->queue_mutex );
1387 mlt_deque_push_back( priv->queue, frame );
1388 pthread_cond_signal( &priv->queue_cond );
1389 pthread_mutex_unlock( &priv->queue_mutex );
1393 // Wait if not realtime.
1394 while ( priv->ahead && priv->real_time < 0 && !priv->is_purge &&
1395 !( mlt_properties_get_int( MLT_FRAME_PROPERTIES( MLT_FRAME( mlt_deque_peek_front( priv->queue ) ) ), "rendered" ) ) )
1397 pthread_mutex_lock( &priv->done_mutex );
1398 pthread_cond_wait( &priv->done_cond, &priv->done_mutex );
1399 pthread_mutex_unlock( &priv->done_mutex );
1402 // Get the frame from the queue.
1403 pthread_mutex_lock( &priv->queue_mutex );
1404 frame = mlt_deque_pop_front( priv->queue );
1405 pthread_mutex_unlock( &priv->queue_mutex );
1411 // Adapt the worker process head to the runtime conditions.
1412 if ( priv->real_time > 0 )
1414 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
1416 priv->consecutive_dropped = 0;
1417 if ( priv->process_head > threads && priv->consecutive_rendered >= priv->process_head )
1418 priv->process_head--;
1420 priv->consecutive_rendered++;
1424 priv->consecutive_rendered = 0;
1425 if ( priv->process_head < buffer - threads && priv->consecutive_dropped > threads )
1426 priv->process_head++;
1428 priv->consecutive_dropped++;
1430 // mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "dropped %d rendered %d process_head %d\n",
1431 // priv->consecutive_dropped, priv->consecutive_rendered, priv->process_head );
1433 // Check for too many consecutively dropped frames
1434 if ( priv->consecutive_dropped > mlt_properties_get_int( properties, "drop_max" ) )
1436 int orig_buffer = mlt_properties_get_int( properties, "buffer" );
1437 int prefill = mlt_properties_get_int( properties, "prefill" );
1438 mlt_log_verbose( self, "too many frames dropped - " );
1440 // If using a default low-latency buffer level (SDL) and below the limit
1441 if ( ( orig_buffer == 1 || prefill == 1 ) && buffer < (threads + 1) * 10 )
1443 // Auto-scale the buffer to compensate
1444 mlt_log_verbose( self, "increasing buffer to %d\n", buffer + threads );
1445 mlt_properties_set_int( properties, "_buffer", buffer + threads );
1446 priv->consecutive_dropped = fps / 2;
1450 // Tell the consumer to render it
1451 mlt_log_verbose( self, "forcing next frame\n" );
1452 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1453 priv->consecutive_dropped = 0;
1457 if ( priv->is_purge ) {
1459 mlt_frame_close( frame );
1465 /** Get the next frame from the producer connected to a consumer.
1467 * Typically, one uses this instead of \p mlt_consumer_get_frame to make
1468 * the asynchronous/real-time behavior configurable at runtime.
1469 * You should close the frame returned from this when you are done with it.
1471 * \public \memberof mlt_consumer_s
1472 * \param self a consumer
1476 mlt_frame mlt_consumer_rt_frame( mlt_consumer self )
1479 mlt_frame frame = NULL;
1481 // Get the properties
1482 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1483 consumer_private *priv = self->local;
1485 // Check if the user has requested real time or not
1486 if ( priv->real_time > 1 || priv->real_time < -1 )
1489 return worker_get_frame( self, properties );
1491 else if ( priv->real_time == 1 || priv->real_time == -1 )
1495 // Is the read ahead running?
1496 if ( priv->ahead == 0 )
1498 int buffer = mlt_properties_get_int( properties, "buffer" );
1499 int prefill = mlt_properties_get_int( properties, "prefill" );
1500 consumer_read_ahead_start( self );
1502 size = prefill > 0 && prefill < buffer ? prefill : buffer;
1505 // Get frame from queue
1506 pthread_mutex_lock( &priv->queue_mutex );
1507 while( priv->ahead && mlt_deque_count( priv->queue ) < size )
1508 pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
1509 frame = mlt_deque_pop_front( priv->queue );
1510 pthread_cond_broadcast( &priv->queue_cond );
1511 pthread_mutex_unlock( &priv->queue_mutex );
1513 else // real_time == 0
1518 mlt_events_fire( properties, "consumer-thread-started", NULL );
1520 // Get the frame in non real time
1521 frame = mlt_consumer_get_frame( self );
1523 // This isn't true, but from the consumers perspective it is
1524 if ( frame != NULL )
1526 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1528 // WebVfx uses this to setup a consumer-stopping event handler.
1529 mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
1536 /** Callback for the implementation to indicate a stopped condition.
1538 * \public \memberof mlt_consumer_s
1539 * \param self a consumer
1542 void mlt_consumer_stopped( mlt_consumer self )
1544 mlt_properties_set_int( MLT_CONSUMER_PROPERTIES( self ), "running", 0 );
1545 mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-stopped", NULL );
1546 mlt_event_unblock( ( ( consumer_private* ) self->local )->event_listener );
1549 /** Stop the consumer.
1551 * \public \memberof mlt_consumer_s
1552 * \param self a consumer
1553 * \return true if there was an error
1556 int mlt_consumer_stop( mlt_consumer self )
1558 // Get the properies
1559 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1560 consumer_private *priv = self->local;
1563 mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping put waiting\n" );
1564 pthread_mutex_lock( &priv->put_mutex );
1565 priv->put_active = 0;
1566 pthread_cond_broadcast( &priv->put_cond );
1567 pthread_mutex_unlock( &priv->put_mutex );
1569 // Stop the consumer
1570 mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping consumer\n" );
1572 // Cancel the read ahead threads
1573 if ( priv->started )
1575 // Unblock the consumer calling mlt_consumer_rt_frame
1576 pthread_mutex_lock( &priv->queue_mutex );
1577 pthread_cond_broadcast( &priv->queue_cond );
1578 pthread_mutex_unlock( &priv->queue_mutex );
1581 // Invoke the child callback
1582 if ( self->stop != NULL )
1585 // Check if the user has requested real time or not and stop if necessary
1586 mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping read_ahead\n" );
1587 if ( abs( priv->real_time ) == 1 )
1588 consumer_read_ahead_stop( self );
1589 else if ( abs( priv->real_time ) > 1 )
1590 consumer_work_stop( self );
1592 // Kill the test card
1593 mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
1595 // Check and run a post command
1596 if ( mlt_properties_get( properties, "post" ) )
1597 if (system( mlt_properties_get( properties, "post" ) ) == -1 )
1598 mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_ERROR, "system(%s) failed!\n", mlt_properties_get( properties, "post" ) );
1600 mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopped\n" );
1605 /** Determine if the consumer is stopped.
1607 * \public \memberof mlt_consumer_s
1608 * \param self a consumer
1609 * \return true if the consumer is stopped
1612 int mlt_consumer_is_stopped( mlt_consumer self )
1614 // Check if the consumer is stopped
1615 if ( self && self->is_stopped )
1616 return self->is_stopped( self );
1621 /** Close and destroy the consumer.
1623 * \public \memberof mlt_consumer_s
1624 * \param self a consumer
1627 void mlt_consumer_close( mlt_consumer self )
1629 if ( self != NULL && mlt_properties_dec_ref( MLT_CONSUMER_PROPERTIES( self ) ) <= 0 )
1631 // Get the childs close function
1632 void ( *consumer_close )( ) = self->close;
1634 if ( consumer_close )
1637 //mlt_consumer_stop( self );
1640 consumer_close( self );
1644 consumer_private *priv = self->local;
1646 // Make sure it only gets called once
1647 self->parent.close = NULL;
1649 // Destroy the push mutex and condition
1650 pthread_mutex_destroy( &priv->put_mutex );
1651 pthread_cond_destroy( &priv->put_cond );
1653 mlt_service_close( &self->parent );
1659 /** Get the position of the last frame shown.
1661 * \public \memberof mlt_consumer_s
1662 * \param consumer a consumer
1663 * \return the position
1666 mlt_position mlt_consumer_position( mlt_consumer consumer )
1668 return ( ( consumer_private* ) consumer->local )->position;
1671 static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
1674 listener( owner, self,
1675 (void**) args[0] /* handle */, (int*) args[1] /* priority */, (thread_function_t) args[2], (void*) args[3] /* data */ );
1678 static void mlt_thread_create( mlt_consumer self, thread_function_t function )
1680 consumer_private *priv = self->local;
1681 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1683 if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
1685 struct sched_param priority;
1686 priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
1687 if ( mlt_events_fire( properties, "consumer-thread-create",
1688 &priv->ahead_thread, &priority.sched_priority, function, self, NULL ) < 1 )
1690 pthread_attr_t thread_attributes;
1691 pthread_attr_init( &thread_attributes );
1692 pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
1693 pthread_attr_setschedparam( &thread_attributes, &priority );
1694 pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
1695 pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
1696 priv->ahead_thread = malloc( sizeof( pthread_t ) );
1697 pthread_t *handle = priv->ahead_thread;
1698 if ( pthread_create( ( pthread_t* ) &( *handle ), &thread_attributes, function, self ) < 0 )
1699 pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
1700 pthread_attr_destroy( &thread_attributes );
1706 if ( mlt_events_fire( properties, "consumer-thread-create",
1707 &priv->ahead_thread, &priority, function, self, NULL ) < 1 )
1709 priv->ahead_thread = malloc( sizeof( pthread_t ) );
1710 pthread_t *handle = priv->ahead_thread;
1711 pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
1716 static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
1719 listener( owner, self, (void*) args[0] /* handle */ );
1722 static void mlt_thread_join( mlt_consumer self )
1724 consumer_private *priv = self->local;
1725 if ( mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-join", priv->ahead_thread, NULL ) < 1 )
1727 pthread_t *handle = priv->ahead_thread;
1728 pthread_join( *handle, NULL );
1729 free( priv->ahead_thread );
1731 priv->ahead_thread = NULL;