]> git.sesse.net Git - mlt/blob - src/framework/mlt_consumer.c
src/framework/*: improve the doxygen documentation (work in progress). This also...
[mlt] / src / framework / mlt_consumer.c
1 /**
2  * \file mlt_consumer.c
3  * \brief abstraction for all consumer services
4  *
5  * Copyright (C) 2003-2008 Ushodaya Enterprises Limited
6  * \author Charles Yates <charles.yates@pandora.be>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License as published by the Free Software Foundation; either
11  * version 2.1 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  */
22
23 #include "mlt_consumer.h"
24 #include "mlt_factory.h"
25 #include "mlt_producer.h"
26 #include "mlt_frame.h"
27 #include "mlt_profile.h"
28
29 #include <stdio.h>
30 #include <string.h>
31 #include <stdlib.h>
32 #include <sys/time.h>
33
34 #undef DEINTERLACE_ON_NOT_NORMAL_SPEED
35
36 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service this, void **args );
37 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service this, void **args );
38 static void mlt_consumer_property_changed( mlt_service owner, mlt_consumer this, char *name );
39 static void apply_profile_properties( mlt_consumer this, mlt_profile profile, mlt_properties properties );
40
41 int mlt_consumer_init( mlt_consumer this, void *child, mlt_profile profile )
42 {
43         int error = 0;
44         memset( this, 0, sizeof( struct mlt_consumer_s ) );
45         this->child = child;
46         error = mlt_service_init( &this->parent, this );
47         if ( error == 0 )
48         {
49                 // Get the properties from the service
50                 mlt_properties properties = MLT_SERVICE_PROPERTIES( &this->parent );
51
52                 // Apply profile to properties
53                 if ( profile == NULL )
54                 {
55                         // Normally the application creates the profile and controls its lifetime
56                         // This is the fallback exception handling
57                         profile = mlt_profile_init( NULL );
58                         mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
59                         mlt_properties_set_data( properties, "_profile", profile, 0, (mlt_destructor)mlt_profile_close, NULL );
60                 }
61                 apply_profile_properties( this, profile, properties );
62
63                 // Default rescaler for all consumers
64                 mlt_properties_set( properties, "rescale", "bilinear" );
65
66                 // Default read ahead buffer size
67                 mlt_properties_set_int( properties, "buffer", 25 );
68
69                 // Default audio frequency and channels
70                 mlt_properties_set_int( properties, "frequency", 48000 );
71                 mlt_properties_set_int( properties, "channels", 2 );
72
73                 // Default of all consumers is real time
74                 mlt_properties_set_int( properties, "real_time", 1 );
75
76                 // Default to environment test card
77                 mlt_properties_set( properties, "test_card", mlt_environment( "MLT_TEST_CARD" ) );
78
79                 // Hmm - default all consumers to yuv422 :-/
80                 this->format = mlt_image_yuv422;
81
82                 mlt_events_register( properties, "consumer-frame-show", ( mlt_transmitter )mlt_consumer_frame_show );
83                 mlt_events_register( properties, "consumer-frame-render", ( mlt_transmitter )mlt_consumer_frame_render );
84                 mlt_events_register( properties, "consumer-stopped", NULL );
85
86                 // Register a property-changed listener to handle the profile property -
87                 // subsequent properties can override the profile
88                 this->event_listener = mlt_events_listen( properties, this, "property-changed", ( mlt_listener )mlt_consumer_property_changed );
89
90                 // Create the push mutex and condition
91                 pthread_mutex_init( &this->put_mutex, NULL );
92                 pthread_cond_init( &this->put_cond, NULL );
93
94         }
95         return error;
96 }
97
98 static void apply_profile_properties( mlt_consumer this, mlt_profile profile, mlt_properties properties )
99 {
100         mlt_event_block( this->event_listener );
101         mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
102         mlt_properties_set_int( properties, "frame_rate_num", profile->frame_rate_num );
103         mlt_properties_set_int( properties, "frame_rate_den", profile->frame_rate_den );
104         mlt_properties_set_int( properties, "width", profile->width );
105         mlt_properties_set_int( properties, "height", profile->height );
106         mlt_properties_set_int( properties, "progressive", profile->progressive );
107         mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
108         mlt_properties_set_int( properties, "sample_aspect_num", profile->sample_aspect_num );
109         mlt_properties_set_int( properties, "sample_aspect_den", profile->sample_aspect_den );
110         mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
111         mlt_properties_set_int( properties, "display_aspect_num", profile->display_aspect_num );
112         mlt_properties_set_int( properties, "display_aspect_num", profile->display_aspect_num );
113         mlt_event_unblock( this->event_listener );
114 }
115
116 static void mlt_consumer_property_changed( mlt_service owner, mlt_consumer this, char *name )
117 {
118         if ( !strcmp( name, "profile" ) )
119         {
120                 // Get the properies
121                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
122
123                 // Get the current profile
124                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
125
126                 // Load the new profile
127                 mlt_profile new_profile = mlt_profile_init( mlt_properties_get( properties, name ) );
128
129                 if ( new_profile )
130                 {
131                         // Copy the profile
132                         if ( profile != NULL )
133                         {
134                                 free( profile->description );
135                                 memcpy( profile, new_profile, sizeof( struct mlt_profile_s ) );
136                                 profile->description = strdup( new_profile->description );
137                                 mlt_profile_close( new_profile );
138                         }
139                         else
140                         {
141                                 profile = new_profile;
142                         }
143
144                         // Apply to properties
145                         apply_profile_properties( this, profile, properties );
146                 }
147         }
148         else if ( !strcmp( name, "frame_rate_num" ) )
149         {
150                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
151                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
152                 if ( profile )
153                 {
154                         profile->frame_rate_num = mlt_properties_get_int( properties, "frame_rate_num" );
155                         mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
156                 }
157         }
158         else if ( !strcmp( name, "frame_rate_den" ) )
159         {
160                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
161                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
162                 if ( profile )
163                 {
164                         profile->frame_rate_den = mlt_properties_get_int( properties, "frame_rate_den" );
165                         mlt_properties_set_double( properties, "fps", mlt_profile_fps( profile ) );
166                 }
167         }
168         else if ( !strcmp( name, "width" ) )
169         {
170                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
171                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
172                 if ( profile )
173                         profile->width = mlt_properties_get_int( properties, "width" );
174         }
175         else if ( !strcmp( name, "height" ) )
176         {
177                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
178                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
179                 if ( profile )
180                         profile->height = mlt_properties_get_int( properties, "height" );
181         }
182         else if ( !strcmp( name, "progressive" ) )
183         {
184                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
185                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
186                 if ( profile )
187                         profile->progressive = mlt_properties_get_int( properties, "progressive" );
188         }
189         else if ( !strcmp( name, "sample_aspect_num" ) )
190         {
191                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
192                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
193                 profile->sample_aspect_num = mlt_properties_get_int( properties, "sample_aspect_num" );
194                 if ( profile )
195                         mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
196         }
197         else if ( !strcmp( name, "sample_aspect_den" ) )
198         {
199                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
200                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
201                 profile->sample_aspect_den = mlt_properties_get_int( properties, "sample_aspect_den" );
202                 if ( profile )
203                         mlt_properties_set_double( properties, "aspect_ratio", mlt_profile_sar( profile )  );
204         }
205         else if ( !strcmp( name, "display_aspect_num" ) )
206         {
207                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
208                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
209                 if ( profile )
210                 {
211                         profile->display_aspect_num = mlt_properties_get_int( properties, "display_aspect_num" );
212                         mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
213                 }
214         }
215         else if ( !strcmp( name, "display_aspect_den" ) )
216         {
217                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
218                 mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
219                 if ( profile )
220                 {
221                         profile->display_aspect_den = mlt_properties_get_int( properties, "display_aspect_den" );
222                         mlt_properties_set_double( properties, "display_ratio", mlt_profile_dar( profile )  );
223                 }
224         }
225 }
226
227 static void mlt_consumer_frame_show( mlt_listener listener, mlt_properties owner, mlt_service this, void **args )
228 {
229         if ( listener != NULL )
230                 listener( owner, this, ( mlt_frame )args[ 0 ] );
231 }
232
233 static void mlt_consumer_frame_render( mlt_listener listener, mlt_properties owner, mlt_service this, void **args )
234 {
235         if ( listener != NULL )
236                 listener( owner, this, ( mlt_frame )args[ 0 ] );
237 }
238
239 /** Create a new consumer.
240 */
241
242 mlt_consumer mlt_consumer_new( mlt_profile profile )
243 {
244         // Create the memory for the structure
245         mlt_consumer this = malloc( sizeof( struct mlt_consumer_s ) );
246
247         // Initialise it
248         if ( this != NULL )
249                 mlt_consumer_init( this, NULL, profile );
250
251         // Return it
252         return this;
253 }
254
255 /** Get the parent service object.
256 */
257
258 mlt_service mlt_consumer_service( mlt_consumer this )
259 {
260         return this != NULL ? &this->parent : NULL;
261 }
262
263 /** Get the consumer properties.
264 */
265
266 mlt_properties mlt_consumer_properties( mlt_consumer this )
267 {
268         return this != NULL ? MLT_SERVICE_PROPERTIES( &this->parent ) : NULL;
269 }
270
271 /** Connect the consumer to the producer.
272 */
273
274 int mlt_consumer_connect( mlt_consumer this, mlt_service producer )
275 {
276         return mlt_service_connect_producer( &this->parent, producer, 0 );
277 }
278
279 /** Start the consumer.
280 */
281
282 int mlt_consumer_start( mlt_consumer this )
283 {
284         // Stop listening to the property-changed event
285         mlt_event_block( this->event_listener );
286
287         // Get the properies
288         mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
289
290         // Determine if there's a test card producer
291         char *test_card = mlt_properties_get( properties, "test_card" );
292
293         // Just to make sure nothing is hanging around...
294         mlt_frame_close( this->put );
295         this->put = NULL;
296         this->put_active = 1;
297
298         // Deal with it now.
299         if ( test_card != NULL )
300         {
301                 if ( mlt_properties_get_data( properties, "test_card_producer", NULL ) == NULL )
302                 {
303                         // Create a test card producer
304                         mlt_profile profile = mlt_service_profile( MLT_CONSUMER_SERVICE( this ) );
305                         mlt_producer producer = mlt_factory_producer( profile, NULL, test_card );
306
307                         // Do we have a producer
308                         if ( producer != NULL )
309                         {
310                                 // Test card should loop I guess...
311                                 mlt_properties_set( MLT_PRODUCER_PROPERTIES( producer ), "eof", "loop" );
312                                 //mlt_producer_set_speed( producer, 0 );
313                                 //mlt_producer_set_in_and_out( producer, 0, 0 );
314
315                                 // Set the test card on the consumer
316                                 mlt_properties_set_data( properties, "test_card_producer", producer, 0, ( mlt_destructor )mlt_producer_close, NULL );
317                         }
318                 }
319         }
320         else
321         {
322                 // Allow the hash table to speed things up
323                 mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
324         }
325
326         // Set the frame duration in microseconds for the frame-dropping heuristic
327         int frame_duration = 1000000 / mlt_properties_get_int( properties, "frame_rate_num" ) *
328                         mlt_properties_get_int( properties, "frame_rate_den" );
329         mlt_properties_set_int( properties, "frame_duration", frame_duration );
330
331         // Check and run an ante command
332         if ( mlt_properties_get( properties, "ante" ) )
333                 system( mlt_properties_get( properties, "ante" ) );
334
335         // Set the real_time preference
336         this->real_time = mlt_properties_get_int( properties, "real_time" );
337
338         // Start the service
339         if ( this->start != NULL )
340                 return this->start( this );
341
342         return 0;
343 }
344
345 /** An alternative method to feed frames into the consumer - only valid if
346         the consumer itself is not connected.
347 */
348
349 int mlt_consumer_put_frame( mlt_consumer this, mlt_frame frame )
350 {
351         int error = 1;
352
353         // Get the service assoicated to the consumer
354         mlt_service service = MLT_CONSUMER_SERVICE( this );
355
356         if ( mlt_service_producer( service ) == NULL )
357         {
358                 struct timeval now;
359                 struct timespec tm;
360                 pthread_mutex_lock( &this->put_mutex );
361                 while ( this->put_active && this->put != NULL )
362                 {
363                         gettimeofday( &now, NULL );
364                         tm.tv_sec = now.tv_sec + 1;
365                         tm.tv_nsec = now.tv_usec * 1000;
366                         pthread_cond_timedwait( &this->put_cond, &this->put_mutex, &tm );
367                 }
368                 if ( this->put_active && this->put == NULL )
369                         this->put = frame;
370                 else
371                         mlt_frame_close( frame );
372                 pthread_cond_broadcast( &this->put_cond );
373                 pthread_mutex_unlock( &this->put_mutex );
374         }
375         else
376         {
377                 mlt_frame_close( frame );
378         }
379
380         return error;
381 }
382
383 /** Protected method for consumer to get frames from connected service
384 */
385
386 mlt_frame mlt_consumer_get_frame( mlt_consumer this )
387 {
388         // Frame to return
389         mlt_frame frame = NULL;
390
391         // Get the service assoicated to the consumer
392         mlt_service service = MLT_CONSUMER_SERVICE( this );
393
394         // Get the consumer properties
395         mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
396
397         // Get the frame
398         if ( mlt_service_producer( service ) == NULL && mlt_properties_get_int( properties, "put_mode" ) )
399         {
400                 struct timeval now;
401                 struct timespec tm;
402                 pthread_mutex_lock( &this->put_mutex );
403                 while ( this->put_active && this->put == NULL )
404                 {
405                         gettimeofday( &now, NULL );
406                         tm.tv_sec = now.tv_sec + 1;
407                         tm.tv_nsec = now.tv_usec * 1000;
408                         pthread_cond_timedwait( &this->put_cond, &this->put_mutex, &tm );
409                 }
410                 frame = this->put;
411                 this->put = NULL;
412                 pthread_cond_broadcast( &this->put_cond );
413                 pthread_mutex_unlock( &this->put_mutex );
414                 if ( frame != NULL )
415                         mlt_service_apply_filters( service, frame, 0 );
416         }
417         else if ( mlt_service_producer( service ) != NULL )
418         {
419                 mlt_service_get_frame( service, &frame, 0 );
420         }
421         else
422         {
423                 frame = mlt_frame_init( service );
424         }
425
426         if ( frame != NULL )
427         {
428                 // Get the frame properties
429                 mlt_properties frame_properties = MLT_FRAME_PROPERTIES( frame );
430
431                 // Get the test card producer
432                 mlt_producer test_card = mlt_properties_get_data( properties, "test_card_producer", NULL );
433
434                 // Attach the test frame producer to it.
435                 if ( test_card != NULL )
436                         mlt_properties_set_data( frame_properties, "test_card_producer", test_card, 0, NULL, NULL );
437
438                 // Attach the rescale property
439                 mlt_properties_set( frame_properties, "rescale.interp", mlt_properties_get( properties, "rescale" ) );
440
441                 // Aspect ratio and other jiggery pokery
442                 mlt_properties_set_double( frame_properties, "consumer_aspect_ratio", mlt_properties_get_double( properties, "aspect_ratio" ) );
443                 mlt_properties_set_int( frame_properties, "consumer_deinterlace", mlt_properties_get_int( properties, "progressive" ) | mlt_properties_get_int( properties, "deinterlace" ) );
444                 mlt_properties_set( frame_properties, "deinterlace_method", mlt_properties_get( properties, "deinterlace_method" ) );
445         }
446
447         // Return the frame
448         return frame;
449 }
450
451 static inline long time_difference( struct timeval *time1 )
452 {
453         struct timeval time2;
454         time2.tv_sec = time1->tv_sec;
455         time2.tv_usec = time1->tv_usec;
456         gettimeofday( time1, NULL );
457         return time1->tv_sec * 1000000 + time1->tv_usec - time2.tv_sec * 1000000 - time2.tv_usec;
458 }
459
460 static void *consumer_read_ahead_thread( void *arg )
461 {
462         // The argument is the consumer
463         mlt_consumer this = arg;
464
465         // Get the properties of the consumer
466         mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
467
468         // Get the width and height
469         int width = mlt_properties_get_int( properties, "width" );
470         int height = mlt_properties_get_int( properties, "height" );
471
472         // See if video is turned off
473         int video_off = mlt_properties_get_int( properties, "video_off" );
474         int preview_off = mlt_properties_get_int( properties, "preview_off" );
475         int preview_format = mlt_properties_get_int( properties, "preview_format" );
476
477         // Get the audio settings
478         mlt_audio_format afmt = mlt_audio_pcm;
479         int counter = 0;
480         double fps = mlt_properties_get_double( properties, "fps" );
481         int channels = mlt_properties_get_int( properties, "channels" );
482         int frequency = mlt_properties_get_int( properties, "frequency" );
483         int samples = 0;
484         int16_t *pcm = NULL;
485
486         // See if audio is turned off
487         int audio_off = mlt_properties_get_int( properties, "audio_off" );
488
489         // Get the maximum size of the buffer
490         int buffer = mlt_properties_get_int( properties, "buffer" ) + 1;
491
492         // General frame variable
493         mlt_frame frame = NULL;
494         uint8_t *image = NULL;
495
496         // Time structures
497         struct timeval ante;
498
499         // Average time for get_frame and get_image
500         int count = 1;
501         int skipped = 0;
502         int64_t time_wait = 0;
503         int64_t time_frame = 0;
504         int64_t time_process = 0;
505         int skip_next = 0;
506         mlt_service lock_object = NULL;
507
508         if ( preview_off && preview_format != 0 )
509                 this->format = preview_format;
510
511         // Get the first frame
512         frame = mlt_consumer_get_frame( this );
513
514         // Get the lock object
515         lock_object = mlt_properties_get_data( MLT_FRAME_PROPERTIES( frame ), "consumer_lock_service", NULL );
516
517         // Lock it
518         if ( lock_object ) mlt_service_lock( lock_object );
519
520         // Get the image of the first frame
521         if ( !video_off )
522         {
523                 mlt_events_fire( MLT_CONSUMER_PROPERTIES( this ), "consumer-frame-render", frame, NULL );
524                 mlt_frame_get_image( frame, &image, &this->format, &width, &height, 0 );
525         }
526
527         if ( !audio_off )
528         {
529                 samples = mlt_sample_calculator( fps, frequency, counter++ );
530                 mlt_frame_get_audio( frame, &pcm, &afmt, &frequency, &channels, &samples );
531         }
532
533         // Unlock the lock object
534         if ( lock_object ) mlt_service_unlock( lock_object );
535
536         // Mark as rendered
537         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
538
539         // Get the starting time (can ignore the times above)
540         gettimeofday( &ante, NULL );
541
542         // Continue to read ahead
543         while ( this->ahead )
544         {
545                 // Fetch width/height again
546                 width = mlt_properties_get_int( properties, "width" );
547                 height = mlt_properties_get_int( properties, "height" );
548
549                 // Put the current frame into the queue
550                 pthread_mutex_lock( &this->mutex );
551                 while( this->ahead && mlt_deque_count( this->queue ) >= buffer )
552                         pthread_cond_wait( &this->cond, &this->mutex );
553                 mlt_deque_push_back( this->queue, frame );
554                 pthread_cond_broadcast( &this->cond );
555                 pthread_mutex_unlock( &this->mutex );
556
557                 time_wait += time_difference( &ante );
558
559                 // Get the next frame
560                 frame = mlt_consumer_get_frame( this );
561                 time_frame += time_difference( &ante );
562
563                 // If there's no frame, we're probably stopped...
564                 if ( frame == NULL )
565                         continue;
566
567                 // Attempt to fetch the lock object
568                 lock_object = mlt_properties_get_data( MLT_FRAME_PROPERTIES( frame ), "consumer_lock_service", NULL );
569
570                 // Increment the count
571                 count ++;
572
573                 // Lock if there's a lock object
574                 if ( lock_object ) mlt_service_lock( lock_object );
575
576                 // All non normal playback frames should be shown
577                 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES( frame ), "_speed" ) != 1 )
578                 {
579 #ifdef DEINTERLACE_ON_NOT_NORMAL_SPEED
580                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "consumer_deinterlace", 1 );
581 #endif
582                         skipped = 0;
583                         time_frame = 0;
584                         time_process = 0;
585                         time_wait = 0;
586                         count = 1;
587                         skip_next = 0;
588                 }
589
590                 // Get the image
591                 if ( !skip_next || this->real_time == -1 )
592                 {
593                         // Get the image, mark as rendered and time it
594                         if ( !video_off )
595                         {
596                                 mlt_events_fire( MLT_CONSUMER_PROPERTIES( this ), "consumer-frame-render", frame, NULL );
597                                 mlt_frame_get_image( frame, &image, &this->format, &width, &height, 0 );
598                         }
599                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
600                 }
601                 else
602                 {
603                         // Increment the number of sequentially skipped frames
604                         skipped ++;
605                         skip_next = 0;
606
607                         // If we've reached an unacceptable level, reset everything
608                         if ( skipped > 5 )
609                         {
610                                 skipped = 0;
611                                 time_frame = 0;
612                                 time_process = 0;
613                                 time_wait = 0;
614                                 count = 1;
615                         }
616                 }
617
618                 // Always process audio
619                 if ( !audio_off )
620                 {
621                         samples = mlt_sample_calculator( fps, frequency, counter++ );
622                         mlt_frame_get_audio( frame, &pcm, &afmt, &frequency, &channels, &samples );
623                 }
624
625                 // Increment the time take for this frame
626                 time_process += time_difference( &ante );
627
628                 // Determine if the next frame should be skipped
629                 if ( mlt_deque_count( this->queue ) <= 5 )
630                 {
631                         int frame_duration = mlt_properties_get_int( properties, "frame_duration" );
632                         if ( ( ( time_wait + time_frame + time_process ) / count ) > frame_duration )
633                                 skip_next = 1;
634                 }
635
636                 // Unlock if there's a lock object
637                 if ( lock_object ) mlt_service_unlock( lock_object );
638         }
639
640         // Remove the last frame
641         mlt_frame_close( frame );
642
643         return NULL;
644 }
645
646 static void consumer_read_ahead_start( mlt_consumer this )
647 {
648         // We're running now
649         this->ahead = 1;
650
651         // Create the frame queue
652         this->queue = mlt_deque_init( );
653
654         // Create the mutex
655         pthread_mutex_init( &this->mutex, NULL );
656
657         // Create the condition
658         pthread_cond_init( &this->cond, NULL );
659
660         // Create the read ahead
661         if ( mlt_properties_get( MLT_CONSUMER_PROPERTIES( this ), "priority" ) )
662         {
663                 struct sched_param priority;
664                 priority.sched_priority = mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( this ), "priority" );
665                 pthread_attr_t thread_attributes;
666                 pthread_attr_init( &thread_attributes );
667                 pthread_attr_setschedpolicy( &thread_attributes, SCHED_OTHER );
668                 pthread_attr_setschedparam( &thread_attributes, &priority );
669                 pthread_attr_setinheritsched( &thread_attributes, PTHREAD_EXPLICIT_SCHED );
670                 pthread_attr_setscope( &thread_attributes, PTHREAD_SCOPE_SYSTEM );
671                 if ( pthread_create( &this->ahead_thread, &thread_attributes, consumer_read_ahead_thread, this ) < 0 )
672                         pthread_create( &this->ahead_thread, NULL, consumer_read_ahead_thread, this );
673                 pthread_attr_destroy( &thread_attributes );
674         }
675         else
676         {
677                 pthread_create( &this->ahead_thread, NULL, consumer_read_ahead_thread, this );
678         }
679 }
680
681 static void consumer_read_ahead_stop( mlt_consumer this )
682 {
683         // Make sure we're running
684         if ( this->ahead )
685         {
686                 // Inform thread to stop
687                 this->ahead = 0;
688
689                 // Broadcast to the condition in case it's waiting
690                 pthread_mutex_lock( &this->mutex );
691                 pthread_cond_broadcast( &this->cond );
692                 pthread_mutex_unlock( &this->mutex );
693
694                 // Broadcast to the put condition in case it's waiting
695                 pthread_mutex_lock( &this->put_mutex );
696                 pthread_cond_broadcast( &this->put_cond );
697                 pthread_mutex_unlock( &this->put_mutex );
698
699                 // Join the thread
700                 pthread_join( this->ahead_thread, NULL );
701
702                 // Destroy the mutex
703                 pthread_mutex_destroy( &this->mutex );
704
705                 // Destroy the condition
706                 pthread_cond_destroy( &this->cond );
707
708                 // Wipe the queue
709                 while ( mlt_deque_count( this->queue ) )
710                         mlt_frame_close( mlt_deque_pop_back( this->queue ) );
711
712                 // Close the queue
713                 mlt_deque_close( this->queue );
714         }
715 }
716
717 void mlt_consumer_purge( mlt_consumer this )
718 {
719         if ( this->ahead )
720         {
721                 pthread_mutex_lock( &this->mutex );
722                 while ( mlt_deque_count( this->queue ) )
723                         mlt_frame_close( mlt_deque_pop_back( this->queue ) );
724                 pthread_cond_broadcast( &this->cond );
725                 pthread_mutex_unlock( &this->mutex );
726         }
727 }
728
729 mlt_frame mlt_consumer_rt_frame( mlt_consumer this )
730 {
731         // Frame to return
732         mlt_frame frame = NULL;
733
734         // Get the properties
735         mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
736
737         // Check if the user has requested real time or not
738         if ( this->real_time )
739         {
740                 int size = 1;
741
742                 // Is the read ahead running?
743                 if ( this->ahead == 0 )
744                 {
745                         int buffer = mlt_properties_get_int( properties, "buffer" );
746                         int prefill = mlt_properties_get_int( properties, "prefill" );
747                         consumer_read_ahead_start( this );
748                         if ( buffer > 1 )
749                                 size = prefill > 0 && prefill < buffer ? prefill : buffer;
750                 }
751
752                 // Get frame from queue
753                 pthread_mutex_lock( &this->mutex );
754                 while( this->ahead && mlt_deque_count( this->queue ) < size )
755                         pthread_cond_wait( &this->cond, &this->mutex );
756                 frame = mlt_deque_pop_front( this->queue );
757                 pthread_cond_broadcast( &this->cond );
758                 pthread_mutex_unlock( &this->mutex );
759         }
760         else
761         {
762                 // Get the frame in non real time
763                 frame = mlt_consumer_get_frame( this );
764
765                 // This isn't true, but from the consumers perspective it is
766                 if ( frame != NULL )
767                         mlt_properties_set_int( MLT_FRAME_PROPERTIES( frame ), "rendered", 1 );
768         }
769
770         return frame;
771 }
772
773 /** Callback for the implementation to indicate a stopped condition.
774 */
775
776 void mlt_consumer_stopped( mlt_consumer this )
777 {
778         mlt_properties_set_int( MLT_CONSUMER_PROPERTIES( this ), "running", 0 );
779         mlt_events_fire( MLT_CONSUMER_PROPERTIES( this ), "consumer-stopped", NULL );
780         mlt_event_unblock( this->event_listener );
781 }
782
783 /** Stop the consumer.
784 */
785
786 int mlt_consumer_stop( mlt_consumer this )
787 {
788         // Get the properies
789         mlt_properties properties = MLT_CONSUMER_PROPERTIES( this );
790         char *debug = mlt_properties_get( MLT_CONSUMER_PROPERTIES( this ), "debug" );
791
792         // Just in case...
793         if ( debug ) fprintf( stderr, "%s: stopping put waiting\n", debug );
794         pthread_mutex_lock( &this->put_mutex );
795         this->put_active = 0;
796         pthread_cond_broadcast( &this->put_cond );
797         pthread_mutex_unlock( &this->put_mutex );
798
799         // Stop the consumer
800         if ( debug ) fprintf( stderr, "%s: stopping consumer\n", debug );
801         if ( this->stop != NULL )
802                 this->stop( this );
803
804         // Check if the user has requested real time or not and stop if necessary
805         if ( debug ) fprintf( stderr, "%s: stopping read_ahead\n", debug );
806         if ( mlt_properties_get_int( properties, "real_time" ) )
807                 consumer_read_ahead_stop( this );
808
809         // Kill the test card
810         mlt_properties_set_data( properties, "test_card_producer", NULL, 0, NULL, NULL );
811
812         // Check and run a post command
813         if ( mlt_properties_get( properties, "post" ) )
814                 system( mlt_properties_get( properties, "post" ) );
815
816         if ( debug ) fprintf( stderr, "%s: stopped\n", debug );
817
818         return 0;
819 }
820
821 /** Determine if the consumer is stopped.
822 */
823
824 int mlt_consumer_is_stopped( mlt_consumer this )
825 {
826         // Check if the consumer is stopped
827         if ( this->is_stopped != NULL )
828                 return this->is_stopped( this );
829
830         return 0;
831 }
832
833 /** Close the consumer.
834 */
835
836 void mlt_consumer_close( mlt_consumer this )
837 {
838         if ( this != NULL && mlt_properties_dec_ref( MLT_CONSUMER_PROPERTIES( this ) ) <= 0 )
839         {
840                 // Get the childs close function
841                 void ( *consumer_close )( ) = this->close;
842
843                 if ( consumer_close )
844                 {
845                         // Just in case...
846                         //mlt_consumer_stop( this );
847
848                         this->close = NULL;
849                         consumer_close( this );
850                 }
851                 else
852                 {
853                         // Make sure it only gets called once
854                         this->parent.close = NULL;
855
856                         // Destroy the push mutex and condition
857                         pthread_mutex_destroy( &this->put_mutex );
858                         pthread_cond_destroy( &this->put_cond );
859
860                         mlt_service_close( &this->parent );
861                 }
862         }
863 }