]> git.sesse.net Git - mlt/blob - src/modules/core/consumer_multi.c
drain nested consumers of multi consumer
[mlt] / src / modules / core / consumer_multi.c
1 /*
2  * Copyright (C) 2011 Ushodaya Enterprises Limited
3  * Author: Dan Dennedy <dan@dennedy.org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  */
19
20 #include <framework/mlt.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <sys/time.h>
26
27 // Forward references
28 static int start( mlt_consumer consumer );
29 static int stop( mlt_consumer consumer );
30 static int is_stopped( mlt_consumer consumer );
31 static void *consumer_thread( void *arg );
32 static void consumer_close( mlt_consumer consumer );
33
34 static mlt_properties normalisers = NULL;
35
36 /** Initialise the consumer.
37 */
38
39 mlt_consumer consumer_multi_init( mlt_profile profile, mlt_service_type type, const char *id, char *arg )
40 {
41         mlt_consumer consumer = mlt_consumer_new( profile );
42
43         if ( consumer )
44         {
45                 mlt_properties properties = MLT_CONSUMER_PROPERTIES(consumer);
46
47                 // Set defaults
48                 mlt_properties_set( properties, "resource", arg );
49                 mlt_properties_set_int( properties, "real_time", -1 );
50                 mlt_properties_set_int( properties, "terminate_on_pause", 1 );
51
52                 // Init state
53                 mlt_properties_set_int( properties, "joined", 1 );
54
55                 // Assign callbacks
56                 consumer->close = consumer_close;
57                 consumer->start = start;
58                 consumer->stop = stop;
59                 consumer->is_stopped = is_stopped;
60         }
61
62         return consumer;
63 }
64
65 static mlt_consumer create_consumer( mlt_profile profile, char *id )
66 {
67         char *myid = id ? strdup( id ) : NULL;
68         char *arg = myid ? strchr( myid, ':' ) : NULL;
69         if ( arg != NULL )
70                 *arg ++ = '\0';
71         mlt_consumer consumer = mlt_factory_consumer( profile, myid, arg );
72         if ( myid )
73                 free( myid );
74         return consumer;
75 }
76
77 static void create_filter( mlt_profile profile, mlt_service service, char *effect, int *created )
78 {
79         char *id = strdup( effect );
80         char *arg = strchr( id, ':' );
81         if ( arg != NULL )
82                 *arg ++ = '\0';
83
84         // The swscale and avcolor_space filters require resolution as arg to test compatibility
85         if ( strncmp( effect, "swscale", 7 ) == 0 || strncmp( effect, "avcolo", 6 ) == 0 )
86                 arg = (char*) mlt_properties_get_int( MLT_SERVICE_PROPERTIES( service ), "_real_width" );
87
88         mlt_filter filter = mlt_factory_filter( profile, id, arg );
89         if ( filter != NULL )
90         {
91                 mlt_properties_set_int( MLT_FILTER_PROPERTIES( filter ), "_loader", 1 );
92                 mlt_service_attach( service, filter );
93                 mlt_filter_close( filter );
94                 *created = 1;
95         }
96         free( id );
97 }
98
99 static void attach_normalisers( mlt_profile profile, mlt_service service )
100 {
101         // Loop variable
102         int i;
103
104         // Tokeniser
105         mlt_tokeniser tokeniser = mlt_tokeniser_init( );
106
107         // We only need to load the normalising properties once
108         if ( normalisers == NULL )
109         {
110                 char temp[ 1024 ];
111                 snprintf( temp, sizeof(temp), "%s/core/loader.ini", mlt_environment( "MLT_DATA" ) );
112                 normalisers = mlt_properties_load( temp );
113                 mlt_factory_register_for_clean_up( normalisers, ( mlt_destructor )mlt_properties_close );
114         }
115
116         // Apply normalisers
117         for ( i = 0; i < mlt_properties_count( normalisers ); i ++ )
118         {
119                 int j = 0;
120                 int created = 0;
121                 char *value = mlt_properties_get_value( normalisers, i );
122                 mlt_tokeniser_parse_new( tokeniser, value, "," );
123                 for ( j = 0; !created && j < mlt_tokeniser_count( tokeniser ); j ++ )
124                         create_filter( profile, service, mlt_tokeniser_get_string( tokeniser, j ), &created );
125         }
126
127         // Close the tokeniser
128         mlt_tokeniser_close( tokeniser );
129
130         // Attach the audio and video format converters
131         int created = 0;
132         create_filter( profile, service, "avcolor_space", &created );
133         if ( !created )
134                 create_filter( profile, service, "imageconvert", &created );
135         create_filter( profile, service, "audioconvert", &created );
136 }
137
138 static mlt_consumer generate_consumer( mlt_consumer consumer, mlt_properties props, int index )
139 {
140         mlt_profile profile = NULL;
141         if ( mlt_properties_get( props, "mlt_profile" ) )
142                 profile = mlt_profile_init( mlt_properties_get( props, "mlt_profile" ) );
143         if ( !profile )
144                 profile = mlt_profile_clone( mlt_service_profile( MLT_CONSUMER_SERVICE(consumer) ) );
145         mlt_consumer nested = create_consumer( profile, mlt_properties_get( props, "mlt_service" ) );
146
147         if ( nested )
148         {
149                 mlt_properties properties = MLT_CONSUMER_PROPERTIES(consumer);
150                 mlt_properties nested_props = MLT_CONSUMER_PROPERTIES(nested);
151                 char key[30];
152
153                 snprintf( key, sizeof(key), "%d.consumer", index );
154                 mlt_properties_set_data( properties, key, nested, 0, (mlt_destructor) mlt_consumer_close, NULL );
155                 snprintf( key, sizeof(key), "%d.profile", index );
156                 mlt_properties_set_data( properties, key, profile, 0, (mlt_destructor) mlt_profile_close, NULL );
157
158                 mlt_properties_set_int( nested_props, "put_mode", 1 );
159                 mlt_properties_pass_list( nested_props, properties, "terminate_on_pause" );
160                 mlt_properties_set( props, "consumer", NULL );
161                 // set mlt_profile before other properties to facilitate presets
162                 mlt_properties_pass_list( nested_props, props, "mlt_profile" );
163                 mlt_properties_inherit( nested_props, props );
164
165                 attach_normalisers( profile, MLT_CONSUMER_SERVICE(nested) );
166         }
167         else
168         {
169                 mlt_profile_close( profile );
170         }
171         return nested;
172 }
173
174 static void foreach_consumer_init( mlt_consumer consumer )
175 {
176         const char *resource = mlt_properties_get( MLT_CONSUMER_PROPERTIES(consumer), "resource" );
177         mlt_properties properties = mlt_properties_parse_yaml( resource );
178         char key[20];
179         int index = 0;
180
181         if ( mlt_properties_get_data( MLT_CONSUMER_PROPERTIES(consumer), "0", NULL ) )
182         {
183                 // Properties set directly by application
184                 mlt_properties p;
185
186                 if ( properties )
187                         mlt_properties_close( properties );
188                 properties = MLT_CONSUMER_PROPERTIES(consumer);
189                 do {
190                         snprintf( key, sizeof(key), "%d", index );
191                         if ( ( p = mlt_properties_get_data( properties, key, NULL ) ) )
192                                 generate_consumer( consumer, p, index++ );
193                 } while ( p );
194         }
195         else if ( properties && mlt_properties_get_data( properties, "0", NULL ) )
196         {
197                 // YAML file supplied
198                 mlt_properties p;
199
200                 do {
201                         snprintf( key, sizeof(key), "%d", index );
202                         if ( ( p = mlt_properties_get_data( properties, key, NULL ) ) )
203                                 generate_consumer( consumer, p, index++ );
204                 } while ( p );
205                 mlt_properties_close( properties );
206         }
207         else
208         {
209                 // properties file supplied or properties on this consumer
210                 const char *s;
211
212                 if ( properties )
213                         mlt_properties_close( properties );
214                 if ( resource )
215                         properties = mlt_properties_load( resource );
216                 else
217                         properties = MLT_CONSUMER_PROPERTIES( consumer );
218
219                 do {
220                         snprintf( key, sizeof(key), "%d", index );
221                         if ( ( s = mlt_properties_get( properties, key ) ) )
222                         {
223                                 mlt_properties p = mlt_properties_new();
224                                 int i, count;
225
226                                 if ( !p ) break;
227                                 mlt_properties_set( p, "mlt_service", mlt_properties_get( properties, key ) );
228                                 snprintf( key, sizeof(key), "%d.", index );
229
230                                 count = mlt_properties_count( properties );
231                                 for ( i = 0; i < count; i++ )
232                                 {
233                                         char *name = mlt_properties_get_name( properties, i );
234                                         if ( !strncmp( name, key, strlen(key) ) )
235                                                 mlt_properties_set( p, name + strlen(key),
236                                                         mlt_properties_get_value( properties, i ) );
237                                 }
238                                 generate_consumer( consumer, p, index++ );
239                                 mlt_properties_close( p );
240                         }
241                 } while ( s );
242                 if ( resource )
243                         mlt_properties_close( properties );
244         }
245 }
246
247 static void foreach_consumer_start( mlt_consumer consumer )
248 {
249         mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
250         mlt_consumer nested = NULL;
251         char key[30];
252         int index = 0;
253
254         do {
255                 snprintf( key, sizeof(key), "%d.consumer", index++ );
256                 nested = mlt_properties_get_data( properties, key, NULL );
257                 if ( nested )
258                 {
259                         mlt_properties nested_props = MLT_CONSUMER_PROPERTIES(nested);
260                         mlt_properties_set_position( nested_props, "_multi_position", 0 );
261                         mlt_properties_set_data( nested_props, "_multi_audio", NULL, 0, NULL, NULL );
262                         mlt_properties_set_int( nested_props, "_multi_samples", 0 );
263                         mlt_consumer_start( nested );
264                 }
265         } while ( nested );
266 }
267
268 static void foreach_consumer_refresh( mlt_consumer consumer )
269 {
270         mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
271         mlt_consumer nested = NULL;
272         char key[30];
273         int index = 0;
274
275         do {
276                 snprintf( key, sizeof(key), "%d.consumer", index++ );
277                 nested = mlt_properties_get_data( properties, key, NULL );
278                 if ( nested ) mlt_properties_set_int( MLT_CONSUMER_PROPERTIES(nested), "refresh", 1 );
279         } while ( nested );
280 }
281
282 static void foreach_consumer_put( mlt_consumer consumer, mlt_frame frame )
283 {
284         mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
285         mlt_consumer nested = NULL;
286         char key[30];
287         int index = 0;
288
289         do {
290                 snprintf( key, sizeof(key), "%d.consumer", index++ );
291                 nested = mlt_properties_get_data( properties, key, NULL );
292                 if ( nested )
293                 {
294                         mlt_properties nested_props = MLT_CONSUMER_PROPERTIES(nested);
295                         double self_fps = mlt_properties_get_double( properties, "fps" );
296                         double nested_fps = mlt_properties_get_double( nested_props, "fps" );
297                         mlt_position nested_pos = mlt_properties_get_position( nested_props, "_multi_position" );
298                         mlt_position self_pos = mlt_frame_get_position( frame );
299                         double self_time = self_pos / self_fps;
300                         double nested_time = nested_pos / nested_fps;
301
302                         // get the audio for the current frame
303                         uint8_t *buffer = NULL;
304                         mlt_audio_format format = mlt_audio_s16;
305                         int channels = mlt_properties_get_int( properties, "channels" );
306                         int frequency = mlt_properties_get_int( properties, "frequency" );
307                         int current_samples = mlt_sample_calculator( self_fps, frequency, self_pos );
308                         mlt_frame_get_audio( frame, (void**) &buffer, &format, &frequency, &channels, &current_samples );
309                         int current_size = mlt_audio_format_size( format, current_samples, channels );
310
311                         // get any leftover audio
312                         int prev_size = 0;
313                         uint8_t *prev_buffer = mlt_properties_get_data( nested_props, "_multi_audio", &prev_size );
314                         uint8_t *new_buffer = NULL;
315                         if ( prev_size > 0 )
316                         {
317                                 new_buffer = mlt_pool_alloc( prev_size + current_size );
318                                 memcpy( new_buffer, prev_buffer, prev_size );
319                                 memcpy( new_buffer + prev_size, buffer, current_size );
320                                 buffer = new_buffer;
321                         }
322                         current_size += prev_size;
323                         current_samples += mlt_properties_get_int( nested_props, "_multi_samples" );
324
325                         while ( nested_time <= self_time )
326                         {
327                                 // put ideal number of samples into cloned frame
328                                 mlt_frame clone_frame = mlt_frame_clone( frame, 0 );
329                                 int nested_samples = mlt_sample_calculator( nested_fps, frequency, nested_pos );
330                                 // -10 is an optimization to avoid tiny amounts of leftover samples
331                                 nested_samples = nested_samples > current_samples - 10 ? current_samples : nested_samples;
332                                 int nested_size = mlt_audio_format_size( format, nested_samples, channels );
333                                 if ( nested_size > 0 )
334                                 {
335                                         prev_buffer = mlt_pool_alloc( nested_size );
336                                         memcpy( prev_buffer, buffer, nested_size );
337                                 }
338                                 else
339                                 {
340                                         prev_buffer = NULL;
341                                         nested_size = 0;
342                                 }
343                                 mlt_frame_set_audio( clone_frame, prev_buffer, format, nested_size, mlt_pool_release );
344                                 mlt_properties_set_int( MLT_FRAME_PROPERTIES(clone_frame), "audio_samples", nested_samples );
345                                 mlt_properties_set_int( MLT_FRAME_PROPERTIES(clone_frame), "audio_frequency", frequency );
346                                 mlt_properties_set_int( MLT_FRAME_PROPERTIES(clone_frame), "audio_channels", channels );
347
348                                 // chomp the audio
349                                 current_samples -= nested_samples;
350                                 current_size -= nested_size;
351                                 buffer += nested_size;
352
353                                 // send frame to nested consumer
354                                 mlt_consumer_put_frame( nested, clone_frame );
355                                 mlt_properties_set_position( nested_props, "_multi_position", ++nested_pos );
356                                 nested_time = nested_pos / nested_fps;
357                         }
358
359                         // save any remaining audio
360                         if ( current_size > 0 )
361                         {
362                                 prev_buffer = mlt_pool_alloc( current_size );
363                                 memcpy( prev_buffer, buffer, current_size );
364                         }
365                         else
366                         {
367                                 prev_buffer = NULL;
368                                 current_size = 0;
369                         }
370                         mlt_pool_release( new_buffer );
371                         mlt_properties_set_data( nested_props, "_multi_audio", prev_buffer, current_size, mlt_pool_release, NULL );
372                         mlt_properties_set_int( nested_props, "_multi_samples", current_samples );
373                 }
374         } while ( nested );
375 }
376
377 static void foreach_consumer_stop( mlt_consumer consumer )
378 {
379         mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
380         mlt_consumer nested = NULL;
381         char key[30];
382         int index = 0;
383         struct timespec tm = { 0, 1000 * 1000 };
384
385         do {
386                 snprintf( key, sizeof(key), "%d.consumer", index++ );
387                 nested = mlt_properties_get_data( properties, key, NULL );
388                 if ( nested )
389                 {
390                         // Let consumer with terminate_on_pause stop on their own
391                         if ( mlt_properties_get_int( MLT_CONSUMER_PROPERTIES(nested), "terminate_on_pause" ) )
392                         {
393                                 // Send additional dummy frame to unlatch nested consumer's threads
394                                 mlt_consumer_put_frame( nested, mlt_frame_init( MLT_CONSUMER_SERVICE(consumer) ) );
395                                 // wait for stop
396                                 while ( !mlt_consumer_is_stopped( nested ) )
397                                         nanosleep( &tm, NULL );
398                         }
399                         else
400                         {
401                                 mlt_consumer_stop( nested );
402                         }
403                 }
404         } while ( nested );
405 }
406
407 /** Start the consumer.
408 */
409
410 static int start( mlt_consumer consumer )
411 {
412         // Check that we're not already running
413         if ( is_stopped( consumer ) )
414         {
415                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
416                 pthread_t *thread = calloc( 1, sizeof( pthread_t ) );
417
418                 // Assign the thread to properties with automatic dealloc
419                 mlt_properties_set_data( properties, "thread", thread, sizeof( pthread_t ), free, NULL );
420
421                 // Set the running state
422                 mlt_properties_set_int( properties, "running", 1 );
423                 mlt_properties_set_int( properties, "joined", 0 );
424
425                 // Construct and start nested consumers
426                 if ( !mlt_properties_get_data( properties, "0.consumer", NULL ) )
427                         foreach_consumer_init( consumer );
428                 foreach_consumer_start( consumer );
429
430                 // Create the thread
431                 pthread_create( thread, NULL, consumer_thread, consumer );
432         }
433         return 0;
434 }
435
436 /** Stop the consumer.
437 */
438
439 static int stop( mlt_consumer consumer )
440 {
441         // Check that we're running
442         if ( !mlt_properties_get_int( MLT_CONSUMER_PROPERTIES(consumer), "joined" ) )
443         {
444                 mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
445                 pthread_t *thread = mlt_properties_get_data( properties, "thread", NULL );
446
447                 // Stop the thread
448                 mlt_properties_set_int( properties, "running", 0 );
449
450                 // Wait for termination
451                 if ( thread )
452                 {
453                         foreach_consumer_refresh( consumer );
454                         pthread_join( *thread, NULL );
455                 }
456                 mlt_properties_set_int( properties, "joined", 1 );
457
458                 // Stop nested consumers
459                 foreach_consumer_stop( consumer );
460         }
461
462         return 0;
463 }
464
465 /** Determine if the consumer is stopped.
466 */
467
468 static int is_stopped( mlt_consumer consumer )
469 {
470         return !mlt_properties_get_int( MLT_CONSUMER_PROPERTIES( consumer ), "running" );
471 }
472
473 /** The main thread - the argument is simply the consumer.
474 */
475
476 static void *consumer_thread( void *arg )
477 {
478         mlt_consumer consumer = arg;
479         mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
480         mlt_frame frame = NULL;
481
482         // Determine whether to stop at end-of-media
483         int terminate_on_pause = mlt_properties_get_int( properties, "terminate_on_pause" );
484         int terminated = 0;
485
486         // Loop while running
487         while ( !terminated && !is_stopped( consumer ) )
488         {
489                 // Get the next frame
490                 frame = mlt_consumer_rt_frame( consumer );
491
492                 // Check for termination
493                 if ( terminate_on_pause && frame )
494                         terminated = mlt_properties_get_double( MLT_FRAME_PROPERTIES( frame ), "_speed" ) == 0.0;
495
496                 // Check that we have a frame to work with
497                 if ( frame && !terminated && !is_stopped( consumer ) )
498                 {
499                         if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES(frame), "rendered" ) )
500                         {
501                                 if ( mlt_properties_get_int( MLT_FRAME_PROPERTIES(frame), "_speed" ) == 0 )
502                                         foreach_consumer_refresh( consumer );
503                                 foreach_consumer_put( consumer, frame );
504                                 mlt_events_fire( properties, "consumer-frame-show", frame, NULL );
505                         }
506                         else
507                         {
508                                 int dropped = mlt_properties_get_int( properties, "_dropped" );
509                                 mlt_log_info( MLT_CONSUMER_SERVICE(consumer), "dropped frame %d\n", ++dropped );
510                                 mlt_properties_set_int( properties, "_dropped", dropped );
511                         }
512                         mlt_frame_close( frame );
513                 }
514                 else
515                 {
516                         if ( frame && terminated )
517                         {
518                                 // Send this termination frame to nested consumers for their cancellation
519                                 foreach_consumer_put( consumer, frame );
520                                 mlt_events_fire( properties, "consumer-frame-show", frame, NULL );
521                         }
522                         if ( frame )
523                                 mlt_frame_close( frame );
524                         terminated = 1;
525                 }
526         }
527
528         // Indicate that the consumer is stopped
529         mlt_consumer_stopped( consumer );
530
531         return NULL;
532 }
533
534 /** Close the consumer.
535 */
536
537 static void consumer_close( mlt_consumer consumer )
538 {
539         mlt_consumer_stop( consumer );
540
541         // Close the parent
542         mlt_consumer_close( consumer );
543         free( consumer );
544 }