]> git.sesse.net Git - mlt/blob - src/framework/mlt_consumer.c
202ecc5cb40e94b494bc693376446b8b7a73a185
[mlt] / src / framework / mlt_consumer.c
1 /**
2  * \file mlt_consumer.c
3  * \brief abstraction for all consumer services
4  * \see mlt_consumer_s
5  *
6  * Copyright (C) 2003-2010 Ushodaya Enterprises Limited
7  * \author Charles Yates <charles.yates@pandora.be>
8  * \author Dan Dennedy <dan@dennedy.org>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; either
13  * version 2.1 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  */
24
25 #include "mlt_consumer.h"
26 #include "mlt_factory.h"
27 #include "mlt_producer.h"
28 #include "mlt_frame.h"
29 #include "mlt_profile.h"
30 #include "mlt_log.h"
31
32 #include <stdio.h>
33 #include <string.h>
34 #include <stdlib.h>
35 #include <sys/time.h>
36
37 /** Define this if you want an automatic deinterlace (if necessary) when the
38  * consumer's producer is not running at normal speed.
39  */
40 #undef DEINTERLACE_ON_NOT_NORMAL_SPEED
41
42 /** This is not the ideal place for this, but it is needed by VDPAU as well.
43  */
44 pthread_mutex_t mlt_sdl_mutex = PTHREAD_MUTEX_INITIALIZER;
45
46 /** \brief private members of mlt_consumer */
47
48 typedef struct
49 {
50         int real_time;
51         int ahead;
52         mlt_image_format format;
53         mlt_deque queue;
54         void *ahead_thread;
55         pthread_mutex_t queue_mutex;
56         pthread_cond_t queue_cond;
57         pthread_mutex_t put_mutex;
58         pthread_cond_t put_cond;
59         mlt_frame put;
60         int put_active;
61         mlt_event event_listener;
62         mlt_position position;
63         int is_purge;
64
65         /* additional fields added for the parallel work queue */
66         mlt_deque worker_threads;
67         pthread_mutex_t done_mutex;
68         pthread_cond_t done_cond;
69         int consecutive_dropped;
70         int consecutive_rendered;
71         int process_head;
72         int started;
73         pthread_t *threads; /**< used to deallocate all threads */
74 }
75 consumer_private;
76
77 typedef void* ( *thread_function_t )( void* );
78
79 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
80 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
81 static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name );
82 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties );
83 static void on_consumer_frame_show( mlt_properties owner, mlt_consumer self, mlt_frame frame );
84 static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
85 static void mlt_thread_create( mlt_consumer self, thread_function_t function );
86 static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args );
87 static void mlt_thread_join( mlt_consumer self );
88
89 /** Initialize a consumer service.
90  *
91  * \public \memberof mlt_consumer_s
92  * \param self the consumer to initialize
93  * \param child a pointer to the object for the subclass
94  * \param profile the \p mlt_profile_s to use (optional but recommended,
95  * uses the environment variable MLT if self is NULL)
96  * \return true if there was an error
97  */
98
99 int mlt_consumer_init( mlt_consumer self, void *child, mlt_profile profile )
100 {
101         int error = 0;
102         memset( self, 0, sizeof( struct mlt_consumer_s ) );
103         self->child = child;
104         consumer_private *priv = self->local = calloc( 1, sizeof( consumer_private ) );
105
106         error = mlt_service_init( &self->parent, self );
107         if ( error == 0 )
108         {
109                 // Get the properties from the service
110                 mlt_properties properties = MLT_SERVICE_PROPERTIES( &self->parent );
111
112                 // Apply profile to properties
113                 if ( profile == NULL )
114                 {
115                         // Normally the application creates the profile and controls its lifetime
116                         // This is the fallback exception handling
117                         profile = mlt_profile_init( NULL );
118                         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
119                         mlt_properties_set_data( properties, "_profile", profile, 0, (mlt_destructor)mlt_profile_close, NULL );
120                 }
121                 apply_profile_properties( self, profile, properties );
122
123                 // Default rescaler for all consumers
124                 mlt_properties_set( properties, "rescale", "bilinear" );
125
126                 // Default read ahead buffer size
127                 mlt_properties_set_int( properties, "buffer", 25 );
128                 mlt_properties_set_int( properties, "drop_max", 5 );
129
130                 // Default audio frequency and channels
131                 mlt_properties_set_int( properties, "frequency", 48000 );
132                 mlt_properties_set_int( properties, "channels", 2 );
133
134                 // Default of all consumers is real time
135                 mlt_properties_set_int( properties, "real_time", 1 );
136
137                 // Default to environment test card
138                 mlt_properties_set( properties, "test_card", mlt_environment( "MLT_TEST_CARD" ) );
139
140                 // Hmm - default all consumers to yuv422 :-/
141                 priv->format = mlt_image_yuv422;
142                 mlt_properties_set( properties, "mlt_image_format", mlt_image_format_name( priv->format ) );
143                 mlt_properties_set( properties, "mlt_audio_format", mlt_audio_format_name( mlt_audio_s16 ) );
144
145                 mlt_events_register( properties, "consumer-frame-show", ( mlt_transmitter )mlt_consumer_frame_show );
146                 mlt_events_register( properties, "consumer-frame-render", ( mlt_transmitter )mlt_consumer_frame_render );
147                 mlt_events_register( properties, "consumer-thread-started", NULL );
148                 mlt_events_register( properties, "consumer-thread-stopped", NULL );
149                 mlt_events_register( properties, "consumer-stopping", NULL );
150                 mlt_events_register( properties, "consumer-stopped", NULL );
151                 mlt_events_register( properties, "consumer-thread-create", ( mlt_transmitter )transmit_thread_create );
152                 mlt_events_register( properties, "consumer-thread-join", ( mlt_transmitter )transmit_thread_join );
153                 mlt_events_listen( properties, self, "consumer-frame-show", ( mlt_listener )on_consumer_frame_show );
154
155                 // Register a property-changed listener to handle the profile property -
156                 // subsequent properties can override the profile
157                 priv->event_listener = mlt_events_listen( properties, self, "property-changed", ( mlt_listener )mlt_consumer_property_changed );
158
159                 // Create the push mutex and condition
160                 pthread_mutex_init( &priv->put_mutex, NULL );
161                 pthread_cond_init( &priv->put_cond, NULL );
162
163         }
164         return error;
165 }
166
167 /** Convert the profile into properties on the consumer.
168  *
169  * \private \memberof mlt_consumer_s
170  * \param self a consumer
171  * \param profile a profile
172  * \param properties a properties list (typically, the consumer's)
173  */
174
175 static void apply_profile_properties( mlt_consumer self, mlt_profile profile, mlt_properties properties )
176 {
177         consumer_private *priv = self->local;
178         mlt_event_block( priv->event_listener );
179         mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
180         mlt_properties_set_int( properties, "frame_rate_num", profile->frame_rate_num );
181         mlt_properties_set_int( properties, "frame_rate_den", profile->frame_rate_den );
182         mlt_properties_set_int( properties, "width", profile->width );
183         mlt_properties_set_int( properties, "height", profile->height );
184         mlt_properties_set_int( properties, "progressive", profile->progressive );
185         mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
186         mlt_properties_set_int( properties, "sample_aspect_num", profile->sample_aspect_num );
187         mlt_properties_set_int( properties, "sample_aspect_den", profile->sample_aspect_den );
188         mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
189         mlt_properties_set_int( properties, "display_aspect_num", profile->display_aspect_num );
190         mlt_properties_set_int( properties, "display_aspect_den", profile->display_aspect_den );
191         mlt_properties_set_int( properties, "colorspace", profile->colorspace );
192         mlt_event_unblock( priv->event_listener );
193 }
194
195 /** The property-changed event listener
196  *
197  * \private \memberof mlt_consumer_s
198  * \param owner the events object
199  * \param self the consumer
200  * \param name the name of the property that changed
201  */
202
203 static void mlt_consumer_property_changed( mlt_properties owner, mlt_consumer self, char *name )
204 {
205         if ( !strcmp( name, "mlt_profile" ) )
206         {
207                 // Get the properies
208                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
209
210                 // Get the current profile
211                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
212
213                 // Load the new profile
214                 mlt_profile new_profile = mlt_profile_init( mlt_properties_get( properties, name ) );
215
216                 if ( new_profile )
217                 {
218                         // Copy the profile
219                         if ( profile != NULL )
220                         {
221                                 free( profile->description );
222                                 memcpy( profile, new_profile, sizeof( struct mlt_profile_s ) );
223                                 profile->description = strdup( new_profile->description );
224                         }
225                         else
226                         {
227                                 profile = new_profile;
228                         }
229
230                         // Apply to properties
231                         apply_profile_properties( self, profile, properties );
232                         mlt_profile_close( new_profile );
233                 }
234         }
235         else if ( !strcmp( name, "frame_rate_num" ) )
236         {
237                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
238                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
239                 if ( profile )
240                 {
241                         profile->frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
242                         mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
243                 }
244         }
245         else if ( !strcmp( name, "frame_rate_den" ) )
246         {
247                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
248                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
249                 if ( profile )
250                 {
251                         profile->frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
252                         mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
253                 }
254         }
255         else if ( !strcmp( name, "width" ) )
256         {
257                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
258                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
259                 if ( profile )
260                         profile->width = mlt_properties_get_int( properties, "width" );
261         }
262         else if ( !strcmp( name, "height" ) )
263         {
264                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
265                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
266                 if ( profile )
267                         profile->height = mlt_properties_get_int( properties, "height" );
268         }
269         else if ( !strcmp( name, "progressive" ) )
270         {
271                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
272                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
273                 if ( profile )
274                         profile->progressive = mlt_properties_get_int( properties, "progressive" );
275         }
276         else if ( !strcmp( name, "sample_aspect_num" ) )
277         {
278                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
279                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
280                 if ( profile )
281                 {
282                         profile->sample_aspect_num = mlt_properties_get_int( properties, "sample_aspect_num" );
283                         mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
284                 }
285         }
286         else if ( !strcmp( name, "sample_aspect_den" ) )
287         {
288                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
289                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
290                 if ( profile )
291                 {
292                         profile->sample_aspect_den = mlt_properties_get_int( properties, "sample_aspect_den" );
293                         mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
294                 }
295         }
296         else if ( !strcmp( name, "display_aspect_num" ) )
297         {
298                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
299                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
300                 if ( profile )
301                 {
302                         profile->display_aspect_num = mlt_properties_get_int( properties, "display_aspect_num" );
303                         mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
304                 }
305         }
306         else if ( !strcmp( name, "display_aspect_den" ) )
307         {
308                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
309                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
310                 if ( profile )
311                 {
312                         profile->display_aspect_den = mlt_properties_get_int( properties, "display_aspect_den" );
313                         mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
314                 }
315         }
316         else if ( !strcmp( name, "colorspace" ) )
317         {
318                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
319                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
320                 if ( profile )
321                         profile->colorspace = mlt_properties_get_int( properties, "colorspace" );
322         }
323 }
324
325 /** The transmitter for the consumer-frame-show event
326  *
327  * Invokes the listener.
328  *
329  * \private \memberof mlt_consumer_s
330  * \param listener a function pointer that will be invoked
331  * \param owner the events object that will be passed to \p listener
332  * \param self a service that will be passed to \p listener
333  * \param args an array of pointers - the first entry is passed as a frame to \p listener
334  */
335
336 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
337 {
338         if ( listener != NULL )
339                 listener( owner, self, ( mlt_frame )args[ 0 ] );
340 }
341
342 /** The transmitter for the consumer-frame-render event
343  *
344  * Invokes the listener.
345  *
346  * \private \memberof mlt_consumer_s
347  * \param listener a function pointer that will be invoked
348  * \param owner the events object that will be passed to \p listener
349  * \param self a service that will be passed to \p listener
350  * \param args an array of pointers - the first entry is passed as a frame to \p listener
351  */
352
353 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
354 {
355         if ( listener != NULL )
356                 listener( owner, self, ( mlt_frame )args[ 0 ] );
357 }
358
359 /** A listener on the consumer-frame-show event
360  *
361  * Saves the position of the frame shown.
362  *
363  * \private \memberof mlt_consumer_s
364  * \param owner the events object
365  * \param consumer the consumer on which this event occurred
366  * \param frame the frame that was shown
367  */
368
369 static void on_consumer_frame_show( mlt_properties owner, mlt_consumer consumer, mlt_frame frame )
370 {
371         if ( frame )
372                 ( ( consumer_private*) consumer->local )->position = mlt_frame_get_position( frame );
373 }
374
375 /** Create a new consumer.
376  *
377  * \public \memberof mlt_consumer_s
378  * \param profile a profile (optional, but recommended)
379  * \return a new consumer
380  */
381
382 mlt_consumer mlt_consumer_new( mlt_profile profile )
383 {
384         // Create the memory for the structure
385         mlt_consumer self = malloc( sizeof( struct mlt_consumer_s ) );
386
387         // Initialise it
388         if ( self != NULL && mlt_consumer_init( self, NULL, profile ) == 0 )
389         {
390                 // Return it
391                 return self;
392         }
393         else
394         {
395                 free(self);
396                 return NULL;
397         }
398 }
399
400 /** Get the parent service object.
401  *
402  * \public \memberof mlt_consumer_s
403  * \param self a consumer
404  * \return the parent service class
405  * \see MLT_CONSUMER_SERVICE
406  */
407
408 mlt_service mlt_consumer_service( mlt_consumer self )
409 {
410         return self != NULL ? &self->parent : NULL;
411 }
412
413 /** Get the consumer properties.
414  *
415  * \public \memberof mlt_consumer_s
416  * \param self a consumer
417  * \return the consumer's properties list
418  * \see MLT_CONSUMER_PROPERTIES
419  */
420
421 mlt_properties mlt_consumer_properties( mlt_consumer self )
422 {
423         return self != NULL ? MLT_SERVICE_PROPERTIES( &self->parent ) : NULL;
424 }
425
426 /** Connect the consumer to the producer.
427  *
428  * \public \memberof mlt_consumer_s
429  * \param self a consumer
430  * \param producer a producer
431  * \return > 0 warning, == 0 success, < 0 serious error,
432  *         1 = this service does not accept input,
433  *         2 = the producer is invalid,
434  *         3 = the producer is already registered with this consumer
435  */
436
437 int mlt_consumer_connect( mlt_consumer self, mlt_service producer )
438 {
439         return mlt_service_connect_producer( &self->parent, producer, 0 );
440 }
441
442 /** Start the consumer.
443  *
444  * \public \memberof mlt_consumer_s
445  * \param self a consumer
446  * \return true if there was an error
447  */
448
449 int mlt_consumer_start( mlt_consumer self )
450 {
451         if ( !mlt_consumer_is_stopped( self ) )
452                 return 0;
453
454         consumer_private *priv = self->local;
455
456         // Stop listening to the property-changed event
457         mlt_event_block( priv->event_listener );
458
459         // Get the properies
460         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
461
462         // Determine if there's a test card producer
463         char *test_card = mlt_properties_get( properties, "test_card" );
464
465         // Just to make sure nothing is hanging around...
466         pthread_mutex_lock( &priv->put_mutex );
467         priv->put = NULL;
468         priv->put_active = 1;
469         pthread_mutex_unlock( &priv->put_mutex );
470
471         // Deal with it now.
472         if ( test_card != NULL )
473         {
474                 if ( mlt_properties_get_data( properties, "test_card_producer", NULL ) == NULL )
475                 {
476                         // Create a test card producer
477                         mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( self ) );
478                         mlt_producer producer = mlt_factory_producer( profile, NULL, test_card );
479
480                         // Do we have a producer
481                         if ( producer != NULL )
482                         {
483                                 // Test card should loop I guess...
484                                 mlt_properties_set( MLT_PRODUCER_PROPERTIES( producer ), "eof", "loop" );
485                                 //mlt_producer_set_speed( producer, 0 );
486                                 //mlt_producer_set_in_and_out( producer, 0, 0 );
487
488                                 // Set the test card on the consumer
489                                 mlt_properties_set_data( properties, "test_card_producer", producer, 0, ( mlt_destructor )mlt_producer_close, NULL );
490                         }
491                 }
492         }
493         else
494         {
495                 // Allow the hash table to speed things up
496                 mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
497         }
498
499         // The profile could have changed between a stop and a restart.
500         apply_profile_properties( self, mlt_service_profile( MLT_CONSUMER_SERVICE(self) ), properties );
501
502         // Set the frame duration in microseconds for the frame-dropping heuristic
503         int frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
504         int frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
505         int frame_duration = 0;
506
507         if ( frame_rate_num && frame_rate_den )
508         {
509                 frame_duration = 1000000 / frame_rate_num * frame_rate_den;
510         }
511
512         mlt_properties_set_int( properties, "frame_duration", frame_duration );
513
514         // Check and run an ante command
515         if ( mlt_properties_get( properties, "ante" ) )
516                 if ( system( mlt_properties_get( properties, "ante" ) ) == -1 )
517                         mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_ERROR, "system(%s) failed!\n", mlt_properties_get( properties, "ante" ) );
518
519         // Set the real_time preference
520         priv->real_time = mlt_properties_get_int( properties, "real_time" );
521
522         // For worker threads implementation, buffer must be at least # threads
523         if ( abs( priv->real_time ) > 1 && mlt_properties_get_int( properties, "buffer" ) <= abs( priv->real_time ) )
524                 mlt_properties_set_int( properties, "_buffer", abs( priv->real_time ) + 1 );
525
526         // Get the image format to use for rendering threads
527         const char* format = mlt_properties_get( properties, "mlt_image_format" );
528         if ( format )
529         {
530                 if ( !strcmp( format, "rgb24" ) )
531                         priv->format = mlt_image_rgb24;
532                 else if ( !strcmp( format, "rgb24a" ) )
533                         priv->format = mlt_image_rgb24a;
534                 else if ( !strcmp( format, "yuv420p" ) )
535                         priv->format = mlt_image_yuv420p;
536                 else if ( !strcmp( format, "none" ) )
537                         priv->format = mlt_image_none;
538                 else if ( !strcmp( format, "glsl" ) )
539                         priv->format = mlt_image_glsl_texture;
540                 else
541                         priv->format = mlt_image_yuv422;
542         }
543
544         // Start the service
545         if ( self->start != NULL )
546                 return self->start( self );
547
548         return 0;
549 }
550
551 /** An alternative method to feed frames into the consumer.
552  *
553  * Only valid if the consumer itself is not connected.
554  *
555  * \public \memberof mlt_consumer_s
556  * \param self a consumer
557  * \param frame a frame
558  * \return true (ignore self for now)
559  */
560
561 int mlt_consumer_put_frame( mlt_consumer self, mlt_frame frame )
562 {
563         int error = 1;
564
565         // Get the service assoicated to the consumer
566         mlt_service service = MLT_CONSUMER_SERVICE( self );
567
568         if ( mlt_service_producer( service ) == NULL )
569         {
570                 struct timeval now;
571                 struct timespec tm;
572                 consumer_private *priv = self->local;
573
574                 pthread_mutex_lock( &priv->put_mutex );
575                 while ( priv->put_active && priv->put != NULL )
576                 {
577                         gettimeofday( &now, NULL );
578                         tm.tv_sec = now.tv_sec + 1;
579                         tm.tv_nsec = now.tv_usec * 1000;
580                         pthread_cond_timedwait( &priv->put_cond, &priv->put_mutex, &tm );
581                 }
582                 if ( priv->put_active && priv->put == NULL )
583                         priv->put = frame;
584                 else
585                         mlt_frame_close( frame );
586                 pthread_cond_broadcast( &priv->put_cond );
587                 pthread_mutex_unlock( &priv->put_mutex );
588         }
589         else
590         {
591                 mlt_frame_close( frame );
592         }
593
594         return error;
595 }
596
597 /** Protected method for consumer to get frames from connected service
598  *
599  * \public \memberof mlt_consumer_s
600  * \param self a consumer
601  * \return a frame
602  */
603
604 mlt_frame mlt_consumer_get_frame( mlt_consumer self )
605 {
606         // Frame to return
607         mlt_frame frame = NULL;
608
609         // Get the service assoicated to the consumer
610         mlt_service service = MLT_CONSUMER_SERVICE( self );
611
612         // Get the consumer properties
613         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
614
615         // Get the frame
616         if ( mlt_service_producer( service ) == NULL && mlt_properties_get_int( properties, "put_mode" ) )
617         {
618                 struct timeval now;
619                 struct timespec tm;
620                 consumer_private *priv = self->local;
621
622                 pthread_mutex_lock( &priv->put_mutex );
623                 while ( priv->put_active && priv->put == NULL )
624                 {
625                         gettimeofday( &now, NULL );
626                         tm.tv_sec = now.tv_sec + 1;
627                         tm.tv_nsec = now.tv_usec * 1000;
628                         pthread_cond_timedwait( &priv->put_cond, &priv->put_mutex, &tm );
629                 }
630                 frame = priv->put;
631                 priv->put = NULL;
632                 pthread_cond_broadcast( &priv->put_cond );
633                 pthread_mutex_unlock( &priv->put_mutex );
634                 if ( frame != NULL )
635                         mlt_service_apply_filters( service, frame, 0 );
636         }
637         else if ( mlt_service_producer( service ) != NULL )
638         {
639                 mlt_service_get_frame( service, &frame, 0 );
640         }
641         else
642         {
643                 frame = mlt_frame_init( service );
644         }
645
646         if ( frame != NULL )
647         {
648                 // Get the frame properties
649                 mlt_properties frame_properties = MLT_FRAME_PROPERTIES( frame );
650
651                 // Get the test card producer
652                 mlt_producer test_card = mlt_properties_get_data( properties, "test_card_producer", NULL );
653
654                 // Attach the test frame producer to it.
655                 if ( test_card != NULL )
656                         mlt_properties_set_data( frame_properties, "test_card_producer", test_card, 0, NULL, NULL );
657
658                 // Pass along the interpolation and deinterlace options
659                 // TODO: get rid of consumer_deinterlace and use profile.progressive
660                 mlt_properties_set( frame_properties, "rescale.interp", mlt_properties_get( properties, "rescale" ) );
661                 mlt_properties_set_int( frame_properties, "consumer_deinterlace", mlt_properties_get_int( properties, "progressive" ) | mlt_properties_get_int( properties, "deinterlace" ) );
662                 mlt_properties_set( frame_properties, "deinterlace_method", mlt_properties_get( properties, "deinterlace_method" ) );
663                 mlt_properties_set_int( frame_properties, "consumer_tff", mlt_properties_get_int( properties, "top_field_first" ) );
664         }
665
666         // Return the frame
667         return frame;
668 }
669
670 /** Compute the time difference between now and a time value.
671  *
672  * \private \memberof mlt_consumer_s
673  * \param time1 a time value to be compared against now
674  * \return the difference in microseconds
675  */
676
677 static inline long time_difference( struct timeval *time1 )
678 {
679         struct timeval time2;
680         time2.tv_sec = time1->tv_sec;
681         time2.tv_usec = time1->tv_usec;
682         gettimeofday( time1, NULL );
683         return time1->tv_sec * 1000000 + time1->tv_usec - time2.tv_sec * 1000000 - time2.tv_usec;
684 }
685
686 /** The thread procedure for asynchronously pulling frames through the service
687  * network connected to a consumer.
688  *
689  * \private \memberof mlt_consumer_s
690  * \param arg a consumer
691  */
692
693 static void *consumer_read_ahead_thread( void *arg )
694 {
695         // The argument is the consumer
696         mlt_consumer self = arg;
697         consumer_private *priv = self->local;
698
699         // Get the properties of the consumer
700         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
701
702         // Get the width and height
703         int width = mlt_properties_get_int( properties, "width" );
704         int height = mlt_properties_get_int( properties, "height" );
705
706         // See if video is turned off
707         int video_off = mlt_properties_get_int( properties, "video_off" );
708         int preview_off = mlt_properties_get_int( properties, "preview_off" );
709         int preview_format = mlt_properties_get_int( properties, "preview_format" );
710
711         // Get the audio settings
712         mlt_audio_format afmt = mlt_audio_s16;
713         const char *format = mlt_properties_get( properties, "mlt_audio_format" );
714         if ( format )
715         {
716                 if ( !strcmp( format, "none" ) )
717                         afmt = mlt_audio_none;
718                 else if ( !strcmp( format, "s32" ) )
719                         afmt = mlt_audio_s32;
720                 else if ( !strcmp( format, "s32le" ) )
721                         afmt = mlt_audio_s32le;
722                 else if ( !strcmp( format, "float" ) )
723                         afmt = mlt_audio_float;
724                 else if ( !strcmp( format, "f32le" ) )
725                         afmt = mlt_audio_f32le;
726                 else if ( !strcmp( format, "u8" ) )
727                         afmt = mlt_audio_u8;
728         }
729         int counter = 0;
730         double fps = mlt_properties_get_double( properties, "fps" );
731         int channels = mlt_properties_get_int( properties, "channels" );
732         int frequency = mlt_properties_get_int( properties, "frequency" );
733         int samples = 0;
734         void *audio = NULL;
735
736         // See if audio is turned off
737         int audio_off = mlt_properties_get_int( properties, "audio_off" );
738
739         // Get the maximum size of the buffer
740         int buffer = mlt_properties_get_int( properties, "buffer" ) + 1;
741
742         // General frame variable
743         mlt_frame frame = NULL;
744         uint8_t *image = NULL;
745
746         // Time structures
747         struct timeval ante;
748
749         // Average time for get_frame and get_image
750         int count = 0;
751         int skipped = 0;
752         int64_t time_process = 0;
753         int skip_next = 0;
754         mlt_position pos = 0;
755         mlt_position start_pos = 0;
756         mlt_position last_pos = 0;
757         int frame_duration = mlt_properties_get_int( properties, "frame_duration" );
758         int drop_max = mlt_properties_get_int( properties, "drop_max" );
759
760         if ( preview_off && preview_format != 0 )
761                 priv->format = preview_format;
762
763         mlt_events_fire( properties, "consumer-thread-started", NULL );
764
765         // Get the first frame
766         frame = mlt_consumer_get_frame( self );
767
768         if ( frame )
769         {
770                 // Get the image of the first frame
771                 if ( !video_off )
772                 {
773                         mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", frame, NULL );
774                         mlt_frame_get_image( frame, &image, &priv->format, &width, &height, 0 );
775                 }
776
777                 if ( !audio_off )
778                 {
779                         samples = mlt_sample_calculator( fps, frequency, counter++ );
780                         mlt_frame_get_audio( frame, &audio, &afmt, &frequency, &channels, &samples );
781                 }
782
783                 // Mark as rendered
784                 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
785                 last_pos = start_pos = pos = mlt_frame_get_position( frame );
786         }
787
788         // Get the starting time (can ignore the times above)
789         gettimeofday( &ante, NULL );
790
791         // Continue to read ahead
792         while ( priv->ahead )
793         {
794                 // Put the current frame into the queue
795                 pthread_mutex_lock( &priv->queue_mutex );
796                 while( priv->ahead && mlt_deque_count( priv->queue ) >= buffer )
797                         pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
798                 if ( priv->is_purge )
799                 {
800                         mlt_frame_close( frame );
801                         priv->is_purge = 0;
802                 }
803                 else
804                 {
805                         mlt_deque_push_back( priv->queue, frame );
806                 }
807                 pthread_cond_broadcast( &priv->queue_cond );
808                 pthread_mutex_unlock( &priv->queue_mutex );
809
810                 // Get the next frame
811                 frame = mlt_consumer_get_frame( self );
812
813                 // If there's no frame, we're probably stopped...
814                 if ( frame == NULL )
815                         continue;
816                 pos = mlt_frame_get_position( frame );
817
818                 // WebVfx uses this to setup a consumer-stopping event handler.
819                 mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
820
821                 // Increment the counter used for averaging processing cost
822                 count ++;
823
824                 // All non-normal playback frames should be shown
825                 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
826                 {
827 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
828                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
829 #endif
830                         // Indicate seeking or trick-play
831                         start_pos = pos;
832                 }
833
834                 // If skip flag not set or frame-dropping disabled
835                 if ( !skip_next || priv->real_time == -1 )
836                 {
837                         if ( !video_off )
838                         {
839                                 // Reset width/height - could have been changed by previous mlt_frame_get_image
840                                 width = mlt_properties_get_int( properties, "width" );
841                                 height = mlt_properties_get_int( properties, "height" );
842
843                                 // Get the image
844                                 mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", frame, NULL );
845                                 mlt_frame_get_image( frame, &image, &priv->format, &width, &height, 0 );
846                         }
847
848                         // Indicate the rendered image is available.
849                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
850
851                         // Reset consecutively-skipped counter
852                         skipped = 0;
853                 }
854                 else // Skip image processing
855                 {
856                         // Increment the number of consecutively-skipped frames
857                         skipped++;
858
859                         // If too many (1 sec) consecutively-skipped frames
860                         if ( skipped > drop_max )
861                         {
862                                 // Reset cost tracker
863                                 time_process = 0;
864                                 count = 1;
865                                 mlt_log_verbose( self, "too many frames dropped - forcing next frame\n" );
866                         }
867                 }
868
869                 // Always process audio
870                 if ( !audio_off )
871                 {
872                         samples = mlt_sample_calculator( fps, frequency, counter++ );
873                         mlt_frame_get_audio( frame, &audio, &afmt, &frequency, &channels, &samples );
874                 }
875
876                 // Get the time to process this frame
877                 int64_t time_current = time_difference( &ante );
878
879                 // If the current time is not suddenly some large amount
880                 if ( time_current < time_process / count * 20 || !time_process || count < 5 )
881                 {
882                         // Accumulate the cost for processing this frame
883                         time_process += time_current;
884                 }
885                 else
886                 {
887                         mlt_log_debug( self, "current %"PRId64" threshold %"PRId64" count %d\n",
888                                 time_current, (int64_t) (time_process / count * 20), count );
889                         // Ignore the cost of this frame's time
890                         count--;
891                 }
892
893                 // Determine if we started, resumed, or seeked
894                 if ( pos != last_pos + 1 )
895                         start_pos = pos;
896                 last_pos = pos;
897
898                 // Do not skip the first 20% of buffer at start, resume, or seek
899                 if ( pos - start_pos <= buffer / 5 + 1 )
900                 {
901                         // Reset cost tracker
902                         time_process = 0;
903                         count = 1;
904                 }
905
906                 // Reset skip flag
907                 skip_next = 0;
908
909                 // Only consider skipping if the buffer level is low (or really small)
910                 if ( mlt_deque_count( priv->queue ) <= buffer / 5 + 1 )
911                 {
912                         // Skip next frame if average cost exceeds frame duration.
913                         if ( time_process / count > frame_duration )
914                                 skip_next = 1;
915                         if ( skip_next )
916                                 mlt_log_debug( self, "avg usec %"PRId64" (%"PRId64"/%d) duration %d\n",
917                                         time_process/count, time_process, count, frame_duration);
918                 }
919         }
920
921         // Remove the last frame
922         mlt_frame_close( frame );
923
924         // Wipe the queue
925         pthread_mutex_lock( &priv->queue_mutex );
926         while ( mlt_deque_count( priv->queue ) )
927                 mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
928
929         // Close the queue
930         mlt_deque_close( priv->queue );
931         priv->queue = NULL;
932         pthread_mutex_unlock( &priv->queue_mutex );
933
934         mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
935
936         return NULL;
937 }
938
939 /** Locate the first unprocessed frame in the queue.
940  *
941  * When playing with realtime behavior, we do not use the true head, but
942  * rather an adjusted process_head. The process_head is adjusted based on
943  * the rate of frame-dropping or recovery from frame-dropping. The idea is
944  * that as the level of frame-dropping increases to move the process_head
945  * closer to the tail because the frames are not completing processing prior
946  * to their playout! Then, as frames are not dropped the process_head moves
947  * back closer to the head of the queue so that worker threads can work 
948  * ahead of the playout point (queue head).
949  *
950  * \private \memberof mlt_consumer_s
951  * \param self a consumer
952  * \return an index into the queue
953  */
954
955 static inline int first_unprocessed_frame( mlt_consumer self )
956 {
957         consumer_private *priv = self->local;
958         int index = priv->real_time <= 0 ? 0 : priv->process_head;
959         while ( index < mlt_deque_count( priv->queue ) && MLT_FRAME( mlt_deque_peek( priv->queue, index ) )->is_processing )
960                 index++;
961         return index;
962 }
963
964 /** The worker thread procedure for parallel processing frames.
965  *
966  * \private \memberof mlt_consumer_s
967  * \param arg a consumer
968  */
969
970 static void *consumer_worker_thread( void *arg )
971 {
972         // The argument is the consumer
973         mlt_consumer self = arg;
974         consumer_private *priv = self->local;
975
976         // Get the properties of the consumer
977         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
978
979         // Get the width and height
980         int width = mlt_properties_get_int( properties, "width" );
981         int height = mlt_properties_get_int( properties, "height" );
982         mlt_image_format format = priv->format;
983
984         // See if video is turned off
985         int video_off = mlt_properties_get_int( properties, "video_off" );
986         int preview_off = mlt_properties_get_int( properties, "preview_off" );
987         int preview_format = mlt_properties_get_int( properties, "preview_format" );
988
989         // General frame variable
990         mlt_frame frame = NULL;
991         uint8_t *image = NULL;
992
993         if ( preview_off && preview_format != 0 )
994                 format = preview_format;
995
996         mlt_events_fire( properties, "consumer-thread-started", NULL );
997
998         // Continue to read ahead
999         while ( priv->ahead )
1000         {
1001                 // Get the next unprocessed frame from the work queue
1002                 pthread_mutex_lock( &priv->queue_mutex );
1003                 int index = first_unprocessed_frame( self );
1004                 while ( priv->ahead && index >= mlt_deque_count( priv->queue ) )
1005                 {
1006                         mlt_log_debug( MLT_CONSUMER_SERVICE(self), "waiting in worker index = %d queue count = %d\n",
1007                                 index, mlt_deque_count( priv->queue ) );
1008                         pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
1009                         index = first_unprocessed_frame( self );
1010                 }
1011
1012                 // Mark the frame for processing
1013                 frame = mlt_deque_peek( priv->queue, index );
1014                 if ( frame )
1015                 {
1016                         mlt_log_debug( MLT_CONSUMER_SERVICE(self), "worker processing index = %d frame " MLT_POSITION_FMT " queue count = %d\n",
1017                                 index, mlt_frame_get_position(frame), mlt_deque_count( priv->queue ) );
1018                         frame->is_processing = 1;
1019                         mlt_properties_inc_ref( MLT_FRAME_PROPERTIES( frame ) );
1020                 }
1021                 pthread_mutex_unlock( &priv->queue_mutex );
1022
1023                 // If there's no frame, we're probably stopped...
1024                 if ( frame == NULL )
1025                         continue;
1026
1027                 // WebVfx uses this to setup a consumer-stopping event handler.
1028                 mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
1029
1030 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
1031                 // All non normal playback frames should be shown
1032                 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
1033                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
1034 #endif
1035
1036                 // Get the image
1037                 if ( !video_off )
1038                 {
1039                         // Fetch width/height again
1040                         width = mlt_properties_get_int( properties, "width" );
1041                         height = mlt_properties_get_int( properties, "height" );
1042                         mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-frame-render", frame, NULL );
1043                         mlt_frame_get_image( frame, &image, &format, &width, &height, 0 );
1044                 }
1045                 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1046                 mlt_frame_close( frame );
1047
1048                 // Tell a waiting thread (non-realtime main consumer thread) that we are done.
1049                 pthread_mutex_lock( &priv->done_mutex );
1050                 pthread_cond_broadcast( &priv->done_cond );
1051                 pthread_mutex_unlock( &priv->done_mutex );
1052         }
1053
1054         return NULL;
1055 }
1056
1057 /** Start the read/render thread.
1058  *
1059  * \private \memberof mlt_consumer_s
1060  * \param self a consumer
1061  */
1062
1063 static void consumer_read_ahead_start( mlt_consumer self )
1064 {
1065         consumer_private *priv = self->local;
1066
1067         if ( priv->started )
1068                 return;
1069
1070         // We're running now
1071         priv->ahead = 1;
1072
1073         // Create the frame queue
1074         priv->queue = mlt_deque_init( );
1075
1076         // Create the queue mutex
1077         pthread_mutex_init( &priv->queue_mutex, NULL );
1078
1079         // Create the condition
1080         pthread_cond_init( &priv->queue_cond, NULL );
1081
1082         // Create the read ahead
1083         mlt_thread_create( self, (thread_function_t) consumer_read_ahead_thread );
1084         priv->started = 1;
1085 }
1086
1087 /** Start the worker threads.
1088  *
1089  * \private \memberof mlt_consumer_s
1090  * \param self a consumer
1091  */
1092
1093 static void consumer_work_start( mlt_consumer self )
1094 {
1095         consumer_private *priv = self->local;
1096         int n = abs( priv->real_time );
1097         pthread_t *thread;
1098
1099         if ( priv->started )
1100                 return;
1101
1102         thread = calloc( 1, sizeof( pthread_t ) * n );
1103
1104         // We're running now
1105         priv->ahead = 1;
1106         priv->threads = thread;
1107         
1108         // These keep track of the accelleration of frame dropping or recovery.
1109         priv->consecutive_dropped = 0;
1110         priv->consecutive_rendered = 0;
1111         
1112         // This is the position in the queue from which to look for a frame to process.
1113         // If we always start from the head, then we may likely not complete processing
1114         // before the frame is played out.
1115         priv->process_head = 0;
1116
1117         // Create the queues
1118         priv->queue = mlt_deque_init();
1119         priv->worker_threads = mlt_deque_init();
1120
1121         // Create the mutexes
1122         pthread_mutex_init( &priv->queue_mutex, NULL );
1123         pthread_mutex_init( &priv->done_mutex, NULL );
1124
1125         // Create the conditions
1126         pthread_cond_init( &priv->queue_cond, NULL );
1127         pthread_cond_init( &priv->done_cond, NULL );
1128
1129         // Create the read ahead
1130         if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
1131         {
1132
1133                 struct sched_param priority;
1134                 pthread_attr_t thread_attributes;
1135
1136                 priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
1137                 pthread_attr_init( &thread_attributes );
1138                 pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
1139                 pthread_attr_setschedparam( &thread_attributes, &priority );
1140                 pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
1141                 pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
1142
1143                 while ( n-- )
1144                 {
1145                         if ( pthread_create( thread, &thread_attributes, consumer_worker_thread, self ) < 0 )
1146                                 if ( pthread_create( thread, NULL, consumer_worker_thread, self ) == 0 )
1147                                         mlt_deque_push_back( priv->worker_threads, thread );
1148                         thread++;
1149                 }
1150                 pthread_attr_destroy( &thread_attributes );
1151         }
1152
1153         else
1154         {
1155                 while ( n-- )
1156                 {
1157                         if ( pthread_create( thread, NULL, consumer_worker_thread, self ) == 0 )
1158                                 mlt_deque_push_back( priv->worker_threads, thread );
1159                         thread++;
1160                 }
1161         }
1162         priv->started = 1;
1163 }
1164
1165 /** Stop the read/render thread.
1166  *
1167  * \private \memberof mlt_consumer_s
1168  * \param self a consumer
1169  */
1170
1171 static void consumer_read_ahead_stop( mlt_consumer self )
1172 {
1173         consumer_private *priv = self->local;
1174
1175         // Make sure we're running
1176 // TODO improve support for atomic ops in general (see libavutil/atomic.h)
1177 #ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
1178         if ( __sync_val_compare_and_swap( &priv->started, 1, 0 ) )
1179         {
1180 #else
1181         if ( priv->started )
1182         {
1183                 priv->started = 0;
1184 #endif
1185                 // Inform thread to stop
1186                 priv->ahead = 0;
1187                 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-stopping", NULL );
1188
1189                 // Broadcast to the condition in case it's waiting
1190                 pthread_mutex_lock( &priv->queue_mutex );
1191                 pthread_cond_broadcast( &priv->queue_cond );
1192                 pthread_mutex_unlock( &priv->queue_mutex );
1193
1194                 // Broadcast to the put condition in case it's waiting
1195                 pthread_mutex_lock( &priv->put_mutex );
1196                 pthread_cond_broadcast( &priv->put_cond );
1197                 pthread_mutex_unlock( &priv->put_mutex );
1198
1199                 // Join the thread
1200                 mlt_thread_join( self );
1201
1202                 // Destroy the frame queue mutex
1203                 pthread_mutex_destroy( &priv->queue_mutex );
1204
1205                 // Destroy the condition
1206                 pthread_cond_destroy( &priv->queue_cond );
1207         }
1208 }
1209
1210 /** Stop the worker threads.
1211  *
1212  * \private \memberof mlt_consumer_s
1213  * \param self a consumer
1214  */
1215
1216 static void consumer_work_stop( mlt_consumer self )
1217 {
1218         consumer_private *priv = self->local;
1219
1220         // Make sure we're running
1221 #ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
1222         if ( __sync_val_compare_and_swap( &priv->started, 1, 0 ) )
1223         {
1224 #else
1225         if ( priv->started )
1226         {
1227                 priv->started = 0;
1228 #endif
1229                 // Inform thread to stop
1230                 priv->ahead = 0;
1231                 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-stopping", NULL );
1232
1233                 // Broadcast to the queue condition in case it's waiting
1234                 pthread_mutex_lock( &priv->queue_mutex );
1235                 pthread_cond_broadcast( &priv->queue_cond );
1236                 pthread_mutex_unlock( &priv->queue_mutex );
1237
1238                 // Broadcast to the put condition in case it's waiting
1239                 pthread_mutex_lock( &priv->put_mutex );
1240                 pthread_cond_broadcast( &priv->put_cond );
1241                 pthread_mutex_unlock( &priv->put_mutex );
1242
1243                 // Broadcast to the done condition in case it's waiting
1244                 pthread_mutex_lock( &priv->done_mutex );
1245                 pthread_cond_broadcast( &priv->done_cond );
1246                 pthread_mutex_unlock( &priv->done_mutex );
1247
1248                 // Join the threads
1249                 pthread_t *thread;
1250                 while ( ( thread = mlt_deque_pop_back( priv->worker_threads ) ) )
1251                         pthread_join( *thread, NULL );
1252
1253                 // Deallocate the array of threads
1254                 if ( priv->threads )
1255                         free( priv->threads );
1256
1257                 // Destroy the mutexes
1258                 pthread_mutex_destroy( &priv->queue_mutex );
1259                 pthread_mutex_destroy( &priv->done_mutex );
1260
1261                 // Destroy the conditions
1262                 pthread_cond_destroy( &priv->queue_cond );
1263                 pthread_cond_destroy( &priv->done_cond );
1264
1265                 // Wipe the queues
1266                 while ( mlt_deque_count( priv->queue ) )
1267                         mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
1268
1269                 // Close the queues
1270                 mlt_deque_close( priv->queue );
1271                 mlt_deque_close( priv->worker_threads );
1272
1273                 mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-stopped", NULL );
1274         }
1275 }
1276
1277 /** Flush the read/render thread's buffer.
1278  *
1279  * \public \memberof mlt_consumer_s
1280  * \param self a consumer
1281  */
1282
1283 void mlt_consumer_purge( mlt_consumer self )
1284 {
1285         if ( self )
1286         {
1287                 consumer_private *priv = self->local;
1288
1289                 pthread_mutex_lock( &priv->put_mutex );
1290                 if ( priv->put ) {
1291                         mlt_frame_close( priv->put );
1292                         priv->put = NULL;
1293                 }
1294                 pthread_cond_broadcast( &priv->put_cond );
1295                 pthread_mutex_unlock( &priv->put_mutex );
1296
1297                 if ( self->purge )
1298                         self->purge( self );
1299
1300                 if ( priv->started && priv->real_time )
1301                         pthread_mutex_lock( &priv->queue_mutex );
1302
1303                 while ( priv->started && mlt_deque_count( priv->queue ) )
1304                         mlt_frame_close( mlt_deque_pop_back( priv->queue ) );
1305
1306                 if ( priv->started && priv->real_time )
1307                 {
1308                         priv->is_purge = 1;
1309                         pthread_cond_broadcast( &priv->queue_cond );
1310                         pthread_mutex_unlock( &priv->queue_mutex );
1311                         if ( abs( priv->real_time ) > 1 )
1312                         {
1313                                 pthread_mutex_lock( &priv->done_mutex );
1314                                 pthread_cond_broadcast( &priv->done_cond );
1315                                 pthread_mutex_unlock( &priv->done_mutex );
1316                         }
1317                 }
1318
1319                 pthread_mutex_lock( &priv->put_mutex );
1320                 if ( priv->put ) {
1321                         mlt_frame_close( priv->put );
1322                         priv->put = NULL;
1323                 }
1324                 pthread_cond_broadcast( &priv->put_cond );
1325                 pthread_mutex_unlock( &priv->put_mutex );
1326         }
1327 }
1328
1329 /** Use multiple worker threads and a work queue.
1330  */
1331
1332 static mlt_frame worker_get_frame( mlt_consumer self, mlt_properties properties )
1333 {
1334         // Frame to return
1335         mlt_frame frame = NULL;
1336         consumer_private *priv = self->local;
1337         double fps = mlt_properties_get_double( properties, "fps" );
1338         int threads = abs( priv->real_time );
1339         int buffer = mlt_properties_get_int( properties, "_buffer" );
1340         buffer = buffer > 0 ? buffer : mlt_properties_get_int( properties, "buffer" );
1341         // This is a heuristic to determine a suitable minimum buffer size for the number of threads.
1342         int headroom = 2 + threads * threads;
1343         buffer = buffer < headroom ? headroom : buffer;
1344
1345         // Start worker threads if not already started.
1346         if ( ! priv->ahead )
1347         {
1348                 int prefill = mlt_properties_get_int( properties, "prefill" );
1349                 prefill = prefill > 0 && prefill < buffer ? prefill : buffer;
1350
1351                 consumer_work_start( self );
1352
1353                 // Fill the work queue.
1354                 int i = buffer;
1355                 while ( priv->ahead && i-- )
1356                 {
1357                         frame = mlt_consumer_get_frame( self );
1358                         if ( frame )
1359                         {
1360                                 pthread_mutex_lock( &priv->queue_mutex );
1361                                 mlt_deque_push_back( priv->queue, frame );
1362                                 pthread_cond_signal( &priv->queue_cond );
1363                                 pthread_mutex_unlock( &priv->queue_mutex );
1364                         }
1365                 }
1366
1367                 // Wait for prefill
1368                 while ( priv->ahead && first_unprocessed_frame( self ) < prefill )
1369                 {
1370                         pthread_mutex_lock( &priv->done_mutex );
1371                         pthread_cond_wait( &priv->done_cond, &priv->done_mutex );
1372                         pthread_mutex_unlock( &priv->done_mutex );
1373                 }
1374                 priv->process_head = threads;
1375         }
1376
1377 //      mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "size %d done count %d work count %d process_head %d\n",
1378 //              threads, first_unprocessed_frame( self ), mlt_deque_count( priv->queue ), priv->process_head );
1379
1380         // Feed the work queue
1381         while ( priv->ahead && mlt_deque_count( priv->queue ) < buffer )
1382         {
1383                 frame = mlt_consumer_get_frame( self );
1384                 if ( frame )
1385                 {
1386                         pthread_mutex_lock( &priv->queue_mutex );
1387                         mlt_deque_push_back( priv->queue, frame );
1388                         pthread_cond_signal( &priv->queue_cond );
1389                         pthread_mutex_unlock( &priv->queue_mutex );
1390                 }
1391         }
1392
1393         // Wait if not realtime.
1394         while ( priv->ahead && priv->real_time < 0 && !priv->is_purge &&
1395                 !( mlt_properties_get_int( MLT_FRAME_PROPERTIES( MLT_FRAME( mlt_deque_peek_front( priv->queue ) ) ), "rendered" ) ) )
1396         {
1397                 pthread_mutex_lock( &priv->done_mutex );
1398                 pthread_cond_wait( &priv->done_cond, &priv->done_mutex );
1399                 pthread_mutex_unlock( &priv->done_mutex );
1400         }
1401
1402         // Get the frame from the queue.
1403         pthread_mutex_lock( &priv->queue_mutex );
1404         frame = mlt_deque_pop_front( priv->queue );
1405         pthread_mutex_unlock( &priv->queue_mutex );
1406         if ( ! frame ) {
1407                 priv->is_purge = 0;
1408                 return frame;
1409         }
1410
1411         // Adapt the worker process head to the runtime conditions.
1412         if ( priv->real_time > 0 )
1413         {
1414                 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "rendered" ) )
1415                 {
1416                         priv->consecutive_dropped = 0;
1417                         if ( priv->process_head > threads && priv->consecutive_rendered >= priv->process_head )
1418                                 priv->process_head--;
1419                         else
1420                                 priv->consecutive_rendered++;
1421                 }
1422                 else
1423                 {
1424                         priv->consecutive_rendered = 0;
1425                         if ( priv->process_head < buffer - threads && priv->consecutive_dropped > threads )
1426                                 priv->process_head++;
1427                         else
1428                                 priv->consecutive_dropped++;
1429                 }
1430 //              mlt_log_verbose( MLT_CONSUMER_SERVICE(self), "dropped %d rendered %d process_head %d\n",
1431 //                      priv->consecutive_dropped, priv->consecutive_rendered, priv->process_head );
1432
1433                 // Check for too many consecutively dropped frames
1434                 if ( priv->consecutive_dropped > mlt_properties_get_int( properties, "drop_max" ) )
1435                 {
1436                         int orig_buffer = mlt_properties_get_int( properties, "buffer" );
1437                         int prefill = mlt_properties_get_int( properties, "prefill" );
1438                         mlt_log_verbose( self, "too many frames dropped - " );
1439
1440                         // If using a default low-latency buffer level (SDL) and below the limit
1441                         if ( ( orig_buffer == 1 || prefill == 1 ) && buffer < (threads + 1) * 10 )
1442                         {
1443                                 // Auto-scale the buffer to compensate
1444                                 mlt_log_verbose( self, "increasing buffer to %d\n", buffer + threads );
1445                                 mlt_properties_set_int( properties, "_buffer", buffer + threads );
1446                                 priv->consecutive_dropped = fps / 2;
1447                         }
1448                         else
1449                         {
1450                                 // Tell the consumer to render it
1451                                 mlt_log_verbose( self, "forcing next frame\n" );
1452                                 mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1453                                 priv->consecutive_dropped = 0;
1454                         }
1455                 }
1456         }
1457         if ( priv->is_purge ) {
1458                 priv->is_purge = 0;
1459                 mlt_frame_close( frame );
1460                 frame = NULL;
1461         }
1462         return frame;
1463 }
1464
1465 /** Get the next frame from the producer connected to a consumer.
1466  *
1467  * Typically, one uses this instead of \p mlt_consumer_get_frame to make
1468  * the asynchronous/real-time behavior configurable at runtime.
1469  * You should close the frame returned from this when you are done with it.
1470  *
1471  * \public \memberof mlt_consumer_s
1472  * \param self a consumer
1473  * \return a frame
1474  */
1475
1476 mlt_frame mlt_consumer_rt_frame( mlt_consumer self )
1477 {
1478         // Frame to return
1479         mlt_frame frame = NULL;
1480
1481         // Get the properties
1482         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1483         consumer_private *priv = self->local;
1484
1485         // Check if the user has requested real time or not
1486         if ( priv->real_time > 1 || priv->real_time < -1 )
1487         {
1488                 // see above
1489                 return worker_get_frame( self, properties );
1490         }
1491         else if ( priv->real_time == 1 || priv->real_time == -1 )
1492         {
1493                 int size = 1;
1494
1495                 // Is the read ahead running?
1496                 if ( priv->ahead == 0 )
1497                 {
1498                         int buffer = mlt_properties_get_int( properties, "buffer" );
1499                         int prefill = mlt_properties_get_int( properties, "prefill" );
1500                         consumer_read_ahead_start( self );
1501                         if ( buffer > 1 )
1502                                 size = prefill > 0 && prefill < buffer ? prefill : buffer;
1503                 }
1504
1505                 // Get frame from queue
1506                 pthread_mutex_lock( &priv->queue_mutex );
1507                 while( priv->ahead && mlt_deque_count( priv->queue ) < size )
1508                         pthread_cond_wait( &priv->queue_cond, &priv->queue_mutex );
1509                 frame = mlt_deque_pop_front( priv->queue );
1510                 pthread_cond_broadcast( &priv->queue_cond );
1511                 pthread_mutex_unlock( &priv->queue_mutex );
1512         }
1513         else // real_time == 0
1514         {
1515                 if ( !priv->ahead )
1516                 {
1517                         priv->ahead = 1;
1518                         mlt_events_fire( properties, "consumer-thread-started", NULL );
1519                 }
1520                 // Get the frame in non real time
1521                 frame = mlt_consumer_get_frame( self );
1522
1523                 // This isn't true, but from the consumers perspective it is
1524                 if ( frame != NULL )
1525                 {
1526                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
1527
1528                         // WebVfx uses this to setup a consumer-stopping event handler.
1529                         mlt_properties_set_data( MLT_FRAME_PROPERTIES( frame ), "consumer", self, 0, NULL, NULL );
1530                 }
1531         }
1532
1533         return frame;
1534 }
1535
1536 /** Callback for the implementation to indicate a stopped condition.
1537  *
1538  * \public \memberof mlt_consumer_s
1539  * \param self a consumer
1540  */
1541
1542 void mlt_consumer_stopped( mlt_consumer self )
1543 {
1544         mlt_properties_set_int( MLT_CONSUMER_PROPERTIES( self ), "running", 0 );
1545         mlt_events_fire( MLT_CONSUMER_PROPERTIES( self ), "consumer-stopped", NULL );
1546         mlt_event_unblock( ( ( consumer_private* ) self->local )->event_listener );
1547 }
1548
1549 /** Stop the consumer.
1550  *
1551  * \public \memberof mlt_consumer_s
1552  * \param self a consumer
1553  * \return true if there was an error
1554  */
1555
1556 int mlt_consumer_stop( mlt_consumer self )
1557 {
1558         // Get the properies
1559         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1560         consumer_private *priv = self->local;
1561
1562         // Just in case...
1563         mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping put waiting\n" );
1564         pthread_mutex_lock( &priv->put_mutex );
1565         priv->put_active = 0;
1566         pthread_cond_broadcast( &priv->put_cond );
1567         pthread_mutex_unlock( &priv->put_mutex );
1568
1569         // Stop the consumer
1570         mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping consumer\n" );
1571         
1572         // Cancel the read ahead threads
1573         if ( priv->started )
1574         {
1575                 // Unblock the consumer calling mlt_consumer_rt_frame
1576                 pthread_mutex_lock( &priv->queue_mutex );
1577                 pthread_cond_broadcast( &priv->queue_cond );
1578                 pthread_mutex_unlock( &priv->queue_mutex );
1579         }
1580         
1581         // Invoke the child callback
1582         if ( self->stop != NULL )
1583                 self->stop( self );
1584
1585         // Check if the user has requested real time or not and stop if necessary
1586         mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopping read_ahead\n" );
1587         if ( abs( priv->real_time ) == 1 )
1588                 consumer_read_ahead_stop( self );
1589         else if ( abs( priv->real_time ) > 1 )
1590                 consumer_work_stop( self );
1591
1592         // Kill the test card
1593         mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
1594
1595         // Check and run a post command
1596         if ( mlt_properties_get( properties, "post" ) )
1597                 if (system( mlt_properties_get( properties, "post" ) ) == -1 )
1598                         mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_ERROR, "system(%s) failed!\n", mlt_properties_get( properties, "post" ) );
1599
1600         mlt_log( MLT_CONSUMER_SERVICE( self ), MLT_LOG_DEBUG, "stopped\n" );
1601
1602         return 0;
1603 }
1604
1605 /** Determine if the consumer is stopped.
1606  *
1607  * \public \memberof mlt_consumer_s
1608  * \param self a consumer
1609  * \return true if the consumer is stopped
1610  */
1611
1612 int mlt_consumer_is_stopped( mlt_consumer self )
1613 {
1614         // Check if the consumer is stopped
1615         if ( self && self->is_stopped )
1616                 return self->is_stopped( self );
1617
1618         return 0;
1619 }
1620
1621 /** Close and destroy the consumer.
1622  *
1623  * \public \memberof mlt_consumer_s
1624  * \param self a consumer
1625  */
1626
1627 void mlt_consumer_close( mlt_consumer self )
1628 {
1629         if ( self != NULL && mlt_properties_dec_ref( MLT_CONSUMER_PROPERTIES( self ) ) <= 0 )
1630         {
1631                 // Get the childs close function
1632                 void ( *consumer_close )( ) = self->close;
1633
1634                 if ( consumer_close )
1635                 {
1636                         // Just in case...
1637                         //mlt_consumer_stop( self );
1638
1639                         self->close = NULL;
1640                         consumer_close( self );
1641                 }
1642                 else
1643                 {
1644                         consumer_private *priv = self->local;
1645
1646                         // Make sure it only gets called once
1647                         self->parent.close = NULL;
1648
1649                         // Destroy the push mutex and condition
1650                         pthread_mutex_destroy( &priv->put_mutex );
1651                         pthread_cond_destroy( &priv->put_cond );
1652
1653                         mlt_service_close( &self->parent );
1654                         free( priv );
1655                 }
1656         }
1657 }
1658
1659 /** Get the position of the last frame shown.
1660  *
1661  * \public \memberof mlt_consumer_s
1662  * \param consumer a consumer
1663  * \return the position
1664  */
1665
1666 mlt_position mlt_consumer_position( mlt_consumer consumer )
1667 {
1668         return ( ( consumer_private* ) consumer->local )->position;
1669 }
1670
1671 static void transmit_thread_create( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
1672 {
1673         if ( listener )
1674                 listener( owner, self,
1675                         (void**) args[0] /* handle */, (int*) args[1] /* priority */, (thread_function_t) args[2], (void*) args[3] /* data */ );
1676 }
1677
1678 static void mlt_thread_create( mlt_consumer self, thread_function_t function )
1679 {
1680         consumer_private *priv = self->local;
1681         mlt_properties properties = MLT_CONSUMER_PROPERTIES( self );
1682
1683         if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( self ), "priority" ) )
1684         {
1685                 struct sched_param priority;
1686                 priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( self ), "priority" );
1687                 if ( mlt_events_fire( properties, "consumer-thread-create",
1688                      &priv->ahead_thread, &priority.sched_priority, function, self, NULL ) < 1 )
1689                 {
1690                         pthread_attr_t thread_attributes;
1691                         pthread_attr_init( &thread_attributes );
1692                         pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
1693                         pthread_attr_setschedparam( &thread_attributes, &priority );
1694                         pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
1695                         pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
1696                         priv->ahead_thread = malloc( sizeof( pthread_t ) );
1697                         pthread_t *handle = priv->ahead_thread;
1698                         if ( pthread_create( ( pthread_t* ) &( *handle ), &thread_attributes, function, self ) < 0 )
1699                                 pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
1700                         pthread_attr_destroy( &thread_attributes );
1701                 }
1702         }
1703         else
1704         {
1705                 int priority = -1;
1706                 if ( mlt_events_fire( properties, "consumer-thread-create",
1707                      &priv->ahead_thread, &priority, function, self, NULL ) < 1 )
1708                 {
1709                         priv->ahead_thread = malloc( sizeof( pthread_t ) );
1710                         pthread_t *handle = priv->ahead_thread;
1711                         pthread_create( ( pthread_t* ) &( *handle ), NULL, function, self );
1712                 }
1713         }
1714 }
1715
1716 static void transmit_thread_join( mlt_listener listener, mlt_properties owner, mlt_service self, void **args )
1717 {
1718         if ( listener )
1719                 listener( owner, self, (void*) args[0] /* handle */ );
1720 }
1721
1722 static void mlt_thread_join( mlt_consumer self )
1723 {
1724         consumer_private *priv = self->local;
1725         if ( mlt_events_fire( MLT_CONSUMER_PROPERTIES(self), "consumer-thread-join", priv->ahead_thread, NULL ) < 1 )
1726         {
1727                 pthread_t *handle = priv->ahead_thread;
1728                 pthread_join( *handle, NULL );
1729                 free( priv->ahead_thread );
1730         }
1731         priv->ahead_thread = NULL;
1732 }