]> git.sesse.net Git - vlc/blob - src/input/input.c
Impl�mentation rudimentaire de la synchro : les packets
[vlc] / src / input / input.c
1 /*******************************************************************************
2  * input.c: input thread 
3  * (c)1998 VideoLAN
4  *******************************************************************************
5  * Read an MPEG2 stream, demultiplex and parse it before sending it to
6  * decoders.
7  *******************************************************************************/
8
9 /*******************************************************************************
10  * Preamble
11  *******************************************************************************/
12 #include <errno.h>
13 #include <pthread.h>
14 #include <sys/uio.h>                                                 /* iovec */
15 #include <string.h>
16
17 #include <X11/Xlib.h>
18 #include <X11/extensions/XShm.h>
19 #include <sys/soundcard.h>
20
21 #include <stdlib.h>                               /* atoi(), malloc(), free() */
22 #include <stdio.h>
23 #include <sys/ioctl.h>                                             /* ioctl() */
24 #include <net/if.h>                                                  /* ifreq */
25 #include <netinet/in.h>
26
27 #include "common.h"
28 #include "config.h"
29 #include "mtime.h"
30 #include "intf_msg.h"
31 #include "debug.h"
32
33 #include "input.h"
34 #include "input_psi.h"
35 #include "input_pcr.h"
36 #include "input_netlist.h"
37 #include "decoder_fifo.h"
38 #include "input_file.h"
39 #include "input_network.h"
40
41 #include "audio_output.h"
42 #include "audio_decoder.h"
43
44 #include "video.h"
45 #include "video_output.h"
46 #include "video_decoder.h"
47
48 /******************************************************************************
49  * Local prototypes
50  ******************************************************************************/
51 static void input_Thread( input_thread_t *p_input );
52 static __inline__ int input_ReadPacket( input_thread_t *p_input );
53 static __inline__ void input_SortPacket( input_thread_t *p_input,
54                                          ts_packet_t *ts_packet );
55 static __inline__ void input_DemuxTS( input_thread_t *p_input,
56                                       ts_packet_t *ts_packet,
57                                       es_descriptor_t *es_descriptor );
58 static __inline__ void input_DemuxPES( input_thread_t *p_input,
59                                        ts_packet_t *ts_packet,
60                                        es_descriptor_t *p_es_descriptor,
61                                        boolean_t b_unit_start, boolean_t b_packet_lost );
62 static __inline__ void input_DemuxPSI( input_thread_t *p_input,
63                                        ts_packet_t *ts_packet,
64                                        es_descriptor_t *p_es_descriptor,
65                                        boolean_t b_unit_start, boolean_t b_packet_lost );
66
67 /*******************************************************************************
68  * input_CreateThread: initialize and spawn an input thread
69  *******************************************************************************
70  * This function initializes and spawns an input thread. It returns NULL on
71  * failure. If you want a better understanding of the input thread, don't start
72  * by reading this function :-).
73  *******************************************************************************/
74 input_thread_t *input_CreateThread( input_cfg_t *p_cfg )
75 {
76     input_thread_t *    p_input;
77     int i_index;
78     
79     intf_DbgMsg("input debug 1-1: creating thread (cfg : %p)\n", p_cfg );
80
81     /* Allocate input_thread_t structure. */
82     if( !( p_input = (input_thread_t *)malloc(sizeof(input_thread_t)) ) )
83     {
84         intf_ErrMsg("input error: can't allocate input thread structure (%s)\n",
85                     strerror(errno));
86         return( NULL );
87     }
88     /* Init it */
89     bzero( p_input, sizeof(input_thread_t));
90     for( i_index = 0; i_index < INPUT_MAX_ES; i_index++ )
91     {
92         p_input->p_es[i_index].i_id = EMPTY_PID;
93     }
94
95     /* Find out which method we are gonna use and retrieve pointers. */
96     if( !((p_cfg->i_properties) & INPUT_CFG_METHOD) )
97     {
98         /* i_method is not set. */
99         intf_DbgMsg("input debug: using default method (%d)\n",
100                     INPUT_DEFAULT_METHOD);
101         p_cfg->i_method = INPUT_DEFAULT_METHOD;
102         p_cfg->i_properties |= INPUT_CFG_METHOD;
103     }
104     p_input->i_method = p_cfg->i_method;
105     switch( p_cfg->i_method )
106     {
107         /* File methods */
108         case INPUT_METHOD_TS_FILE:
109             p_input->p_open = &input_FileCreateMethod;
110             p_input->p_read = &input_FileRead;
111             p_input->p_clean = &input_FileDestroyMethod;
112             break;
113
114         /* Network methods */
115         case INPUT_METHOD_TS_UCAST:
116         case INPUT_METHOD_TS_MCAST:
117         case INPUT_METHOD_TS_BCAST:
118         case INPUT_METHOD_TS_VLAN_BCAST:
119             p_input->p_open = &input_NetworkCreateMethod;
120             p_input->p_read = &input_NetworkRead;
121             p_input->p_clean = &input_NetworkDestroyMethod;
122             break;
123
124         case INPUT_METHOD_NONE:
125         default:
126 #ifdef DEBUG
127             /* Internal error, which should never happen */
128             intf_DbgMsg("input debug: unknow method type %d\n",
129                             p_cfg->i_method);
130             return( NULL );
131 #endif
132             break;
133     }
134
135     /* Initialize PSI decoder. */
136     intf_DbgMsg("Initializing PSI decoder\n");
137     if( input_PsiInit( p_input ) == -1 )
138     {
139         free( p_input );
140         return( NULL );
141     }
142
143     /* Initialize PCR decoder. */
144     intf_DbgMsg("Initializing PCR decoder\n");
145     if( input_PcrInit( p_input ) == -1 )
146     {
147         input_PsiClean( p_input );
148         free( p_input );
149         return( NULL );
150     }
151
152     /* Initialize netlists. */
153     if( input_NetlistOpen( p_input ) )
154     {
155         input_PsiClean( p_input );
156         input_PcrClean( p_input );
157         free( p_input );
158         return( NULL );
159     }
160
161 #ifdef STATS
162     /* Initialize counters. */
163     p_input->c_bytes = 0;
164     p_input->c_payload_bytes = 0;
165     p_input->c_ts_packets_read = 0;
166     p_input->c_ts_packets_trashed = 0;
167 #ifdef DEBUG
168     p_input->c_loops = 0;
169 #endif
170 #endif
171
172     /* Let the appropriate method open the socket. */
173     if( (*(p_input->p_open))( p_input, p_cfg ) == -1 )
174     {
175         input_NetlistClean( p_input );
176         input_PsiClean( p_input );
177         input_PcrClean( p_input );
178         free( p_input );
179         return( NULL );
180     }
181
182     intf_DbgMsg("input debug: method %d properly initialized the socket\n",
183                 p_input->i_method);
184
185     /* Create thread and set locks. */
186     p_input->b_die = 0;
187     pthread_mutex_init( &p_input->netlist.lock, NULL );
188     pthread_mutex_init( &p_input->programs_lock, NULL );
189     pthread_mutex_init( &p_input->es_lock, NULL );
190 #ifdef NO_THREAD
191     input_Thread( p_input );
192 #else
193     if( pthread_create(&p_input->thread_id, NULL, (void *) input_Thread, 
194                        (void *) p_input) )
195     {
196         intf_ErrMsg("input error: can't spawn input thread (%s)\n", 
197                     strerror(errno) );
198         (*p_input->p_clean)( p_input );
199         input_NetlistClean( p_input );;
200         input_PsiClean( p_input );
201         input_PcrClean( p_input );
202         free( p_input );
203         return( NULL );
204     }
205 #endif
206
207     /* Default setting for new decoders */
208     p_input->p_aout = p_cfg->p_aout;
209
210     return( p_input );
211 }
212
213 /******************************************************************************
214  * input_DestroyThread: mark an input thread as zombie
215  ******************************************************************************
216  * This function should not return until the thread is effectively cancelled.
217  ******************************************************************************/
218 void input_DestroyThread( input_thread_t *p_input )
219 {
220     int i_es_loop;
221
222     intf_DbgMsg("input debug: requesting termination of input thread\n");
223     p_input->b_die = 1;                          /* ask thread to kill itself */
224     pthread_join( p_input->thread_id, NULL );         /* wait until it's done */
225
226     (*p_input->p_clean)( p_input );           /* close input method */
227
228     /* Destroy all decoder threads. */
229     for( i_es_loop = 0; i_es_loop < INPUT_MAX_ES; i_es_loop++ )
230     {
231         if( p_input->pp_selected_es[i_es_loop] )
232         {
233             switch( p_input->pp_selected_es[i_es_loop]->i_type )
234             {
235                 case MPEG1_VIDEO_ES:
236                 case MPEG2_VIDEO_ES:
237                     vdec_DestroyThread( (vdec_thread_t*)(p_input->pp_selected_es[i_es_loop]->p_dec) /*, NULL */ );
238                     break;
239                 case MPEG1_AUDIO_ES:
240                 case MPEG2_AUDIO_ES:
241                     adec_DestroyThread( (adec_thread_t*)(p_input->pp_selected_es[i_es_loop]->p_dec) );
242                     break;
243                 default:
244 #ifdef DEBUG
245                     /* This should never happen. */
246                     intf_DbgMsg("input debug: unknown stream type ! (%d, %d)\n",
247                              p_input->pp_selected_es[i_es_loop]->i_id,
248                              p_input->pp_selected_es[i_es_loop]->i_type);
249 #endif
250                     break;
251             }
252         }
253         else
254         {
255             /* pp_selected_es should not contain any hole. */
256             break;
257         }
258     }
259
260     input_NetlistClean( p_input );                           /* clean netlist */
261     input_PsiClean( p_input );                       /* clean PSI information */
262     input_PcrClean( p_input );                       /* clean PCR information */
263     free( p_input );                           /* free input_thread structure */
264 }
265
266 #if 0
267 /*******************************************************************************
268  * input_OpenAudioStream: open an audio stream
269  *******************************************************************************
270  * This function spawns an audio decoder and plugs it on the audio output
271  * thread.
272  *******************************************************************************/
273 int input_OpenAudioStream( input_thread_t *p_input, int i_id )
274 {
275     /* ?? */
276 }
277
278 /*******************************************************************************
279  * input_CloseAudioStream: close an audio stream
280  *******************************************************************************
281  * This function destroys an audio decoder.
282  *******************************************************************************/
283 void input_CloseAudioStream( input_thread_t *p_input, int i_id )
284 {
285     /* ?? */
286 }
287
288 /*******************************************************************************
289  * input_OpenVideoStream: open a video stream
290  *******************************************************************************
291  * This function spawns a video decoder and plugs it on a video output thread.
292  *******************************************************************************/
293 int input_OpenVideoStream( input_thread_t *p_input, 
294                            struct vout_thread_s *p_vout, struct video_cfg_s * p_cfg )
295 {
296     /* ?? */
297 }
298
299 /*******************************************************************************
300  * input_CloseVideoStream: close a video stream
301  *******************************************************************************
302  * This function destroys an video decoder.
303  *******************************************************************************/
304 void input_CloseVideoStream( input_thread_t *p_input, int i_id )
305 {
306     /* ?? */
307 }
308 #endif
309
310 /* following functions are local */
311
312 /*******************************************************************************
313  * input_Thread: input thread
314  *******************************************************************************
315  * Thread in charge of processing the network packets and demultiplexing.
316  *******************************************************************************/
317 static void input_Thread( input_thread_t *p_input )
318 {
319     intf_DbgMsg("input debug 11-1: thread %p is active\n", p_input);
320     while( !p_input->b_die )
321     {
322         /* Scatter read the UDP packet from the network or the file. */
323         if( (input_ReadPacket( p_input )) == (-1) )
324         {
325             /* ??? Normally, a thread can't kill itself, but we don't have
326              * any method in case of an error condition ... */
327             p_input->b_die = 1;
328         }
329
330 #ifdef STATS
331         p_input->c_loops++;
332 #endif
333     }
334
335     /* Ohoh, we have to die as soon as possible. */
336     intf_DbgMsg("input debug: thread %p destroyed\n", p_input);
337     pthread_exit( 0 );
338 }
339
340 /*******************************************************************************
341  * input_ReadPacket: reads a packet from the network or the file
342  *******************************************************************************/
343 static __inline__ int input_ReadPacket( input_thread_t *p_input )
344 {
345     int                 i_base_index; /* index of the first free iovec */
346     int                 i_current_index;
347     int                 i_packet_size;
348 #ifdef INPUT_LIFO_TS_NETLIST
349     int                 i_meanwhile_released;
350     int                 i_currently_removed;
351 #endif
352     ts_packet_t *       p_ts_packet;
353
354     /* In this function, we only care about the TS netlist. PES netlist
355      * is for the demultiplexer. */
356 #ifdef INPUT_LIFO_TS_NETLIST
357     i_base_index = p_input->netlist.i_ts_index;
358
359     /* Verify that we still have packets in the TS netlist */
360     if( (INPUT_MAX_TS + INPUT_TS_READ_ONCE - 1 - p_input->netlist.i_ts_index) <= INPUT_TS_READ_ONCE )
361     {
362         intf_ErrMsg("input error: TS netlist is empty !\n");
363         return( -1 );
364     }
365
366 #else /* FIFO netlist */
367     i_base_index = p_input->netlist.i_ts_start;
368     if( p_input->netlist.i_ts_start + INPUT_TS_READ_ONCE -1 > INPUT_MAX_TS )
369     {
370         /* The netlist is splitted in 2 parts. We must gather them to consolidate
371            the FIFO (we make the loop easily in having the same iovec at the far
372            end and in the beginning of netlist_free).
373            That's why the netlist is (INPUT_MAX_TS +1) + (INPUT_TS_READ_ONCE -1)
374            large. */
375         memcpy( p_input->netlist.p_ts_free + INPUT_MAX_TS + 1,
376                 p_input->netlist.p_ts_free,
377                 (p_input->netlist.i_ts_start + INPUT_TS_READ_ONCE - 1 - INPUT_MAX_TS)
378                   * sizeof(struct iovec) );
379     }
380
381     /* Verify that we still have packets in the TS netlist */
382     if( ((p_input->netlist.i_ts_end -1 - p_input->netlist.i_ts_start) & INPUT_MAX_TS) <= INPUT_TS_READ_ONCE )
383     {
384         intf_ErrMsg("input error: TS netlist is empty !\n");
385         return( -1 );
386     }
387 #endif /* FIFO netlist */
388
389     /* Scatter read the buffer. */
390     i_packet_size = (*p_input->p_read)( p_input,
391                            &p_input->netlist.p_ts_free[i_base_index],
392                            INPUT_TS_READ_ONCE );
393     if( i_packet_size == (-1) )
394     {
395         intf_DbgMsg("Read packet %d %p %d %d\n", i_base_index,
396                         &p_input->netlist.p_ts_free[i_base_index],
397                         p_input->netlist.i_ts_start,
398                         p_input->netlist.i_ts_end);
399         intf_ErrMsg("input error: readv() failed (%s)\n", strerror(errno));
400         return( -1 );
401     }
402
403     if( i_packet_size == 0 )
404     {
405         /* No packet has been received, so stop here. */
406         return( 0 );
407     }
408      
409     /* Demultiplex the TS packets (1..INPUT_TS_READ_ONCE) received. */
410     for( i_current_index = i_base_index;
411          (i_packet_size -= TS_PACKET_SIZE) >= 0;
412          i_current_index++ )
413     {
414         /* BTW, something REALLY bad could happen if we receive packets with
415            a wrong size. */
416         p_ts_packet = (ts_packet_t*)(p_input->netlist.p_ts_free[i_current_index].iov_base);
417         /* Don't cry :-), we are allowed to do that cast, because initially,
418            our buffer was malloc'ed with sizeof(ts_packet_t) */
419
420         /* Find out if we need this packet and demultiplex. */
421         input_SortPacket( p_input /* for current PIDs and netlist */,
422                           p_ts_packet);
423     }
424
425     if( i_packet_size > 0 )
426     {
427         intf_ErrMsg("input error: wrong size\n");
428         return( -1 );
429     }
430
431     /* Remove the TS packets we have just filled from the netlist */
432 #ifdef INPUT_LIFO_TS_NETLIST
433     /* We need to take a lock here while we're calculating index positions. */
434     pthread_mutex_lock( &p_input->netlist.lock );
435
436     i_meanwhile_released = i_base_index - p_input->netlist.i_ts_index;
437     if( i_meanwhile_released )
438     {
439         /* That's where it becomes funny :-). Since we didn't take locks for
440            efficiency reasons, other threads (including ourselves, with
441            input_DemuxPacket) might have released packets to the netlist.
442            So we have to copy these iovec where they should go.
443            
444            BTW, that explains why the TS netlist is
445            (INPUT_MAX_TS +1) + (TS_READ_ONCE -1) large. */
446
447         i_currently_removed = i_current_index - i_base_index;
448         if( i_meanwhile_released < i_currently_removed )
449         {
450             /* Copy all iovecs in that case */
451             memcpy( &p_input->netlist.p_ts_free[p_input->netlist.i_ts_index]
452                      + i_currently_removed,
453                     &p_input->netlist.p_ts_free[p_input->netlist.i_ts_index],
454                     i_meanwhile_released * sizeof(struct iovec) );
455         }
456         else
457         {
458             /* We have fewer places than items, so we only move
459                i_currently_removed of them. */
460             memcpy( &p_input->netlist.p_ts_free[i_base_index],
461                     &p_input->netlist.p_ts_free[p_input->netlist.i_ts_index],
462                     i_currently_removed * sizeof(struct iovec) );
463         }
464
465         /* Update i_netlist_index with the information gathered above. */
466         p_input->netlist.i_ts_index += i_currently_removed;
467     }
468     else
469     {
470         /* Nothing happened. */
471         p_input->netlist.i_ts_index = i_current_index;
472     }
473
474     pthread_mutex_unlock( &p_input->netlist.lock );
475
476 #else /* FIFO netlist */
477     /* & is modulo ; that's where we make the loop. */
478     p_input->netlist.i_ts_start = i_current_index & INPUT_MAX_TS;
479 #endif
480
481 #ifdef STATS
482     p_input->c_ts_packets_read += i_current_index - i_base_index;
483     p_input->c_bytes += (i_current_index - i_base_index) * TS_PACKET_SIZE;
484 #endif
485         return( 0 );
486 }
487
488 /*******************************************************************************
489  * input_SortPacket: find out whether we need that packet
490  *******************************************************************************/
491 static __inline__ void input_SortPacket( input_thread_t *p_input,
492                                          ts_packet_t *p_ts_packet )
493 {
494     int             i_current_pid;
495     int             i_es_loop;
496
497     /* Verify that sync_byte, error_indicator and scrambling_control are
498        what we expected. */
499     if( !(p_ts_packet->buffer[0] == 0x47) || (p_ts_packet->buffer[1] & 0x80) ||
500         (p_ts_packet->buffer[3] & 0xc0) )
501     {
502         intf_DbgMsg("input debug: invalid TS header (%p)\n", p_ts_packet);
503     }
504     else
505     {
506         /* Get the PID of the packet. Note that ntohs is needed, for endianness
507            purposes (see man page). */
508         i_current_pid = U16_AT(&p_ts_packet->buffer[1]) & 0x1fff;
509
510 //      intf_DbgMsg("input debug: pid %d received (%p)\n",
511 //                    i_current_pid, p_ts_packet);
512
513         /* Lock current ES state. */
514         pthread_mutex_lock( &p_input->es_lock );
515         
516         /* Verify that we actually want this PID. */
517         for( i_es_loop = 0; i_es_loop < INPUT_MAX_SELECTED_ES; i_es_loop++ )
518         {
519             if( p_input->pp_selected_es[i_es_loop] != NULL)
520             {
521                 if( (*p_input->pp_selected_es[i_es_loop]).i_id
522                      == i_current_pid )
523                 {
524                     /* Don't need the lock anymore, since the value pointed
525                        out by p_input->pp_selected_es[i_es_loop] can only be
526                        modified from inside the input_thread (by the PSI
527                        decoder): interface thread is only allowed to modify
528                        the pp_selected_es table */
529                     pthread_mutex_unlock( &p_input->es_lock );
530
531                     /* We're interested. Pass it to the demultiplexer. */
532                     input_DemuxTS( p_input, p_ts_packet,
533                                    p_input->pp_selected_es[i_es_loop] );
534                     return;
535                 }
536             }
537             else
538             {
539                 /* pp_selected_es should not contain any hole. */
540                 break;
541             }
542         }
543         pthread_mutex_unlock( &p_input->es_lock );
544     }
545
546     /* We weren't interested in receiving this packet. Give it back to the
547        netlist. */
548 //    intf_DbgMsg("SortPacket: freeing unwanted TS %p (pid %d)\n", p_ts_packet,
549 //                     U16_AT(&p_ts_packet->buffer[1]) & 0x1fff);
550     input_NetlistFreeTS( p_input, p_ts_packet );
551 #ifdef STATS
552     p_input->c_ts_packets_trashed++;
553 #endif
554 }
555
556 /*******************************************************************************
557  * input_DemuxTS: first step of demultiplexing: the TS header
558  *******************************************************************************
559  * Stream must also only contain PES and PSI, so PID must have been filtered
560  *******************************************************************************/
561 static __inline__ void input_DemuxTS( input_thread_t *p_input,
562                                       ts_packet_t *p_ts_packet,
563                                       es_descriptor_t *p_es_descriptor )
564 {
565     int         i_dummy;
566     boolean_t   b_adaption;                       /* Adaption field is present */
567     boolean_t   b_payload;                           /* Packet carries payload */
568     boolean_t   b_unit_start;            /* A PSI or a PES start in the packet */
569     boolean_t   b_trash = 0;                   /* Must the packet be trashed ? */
570     boolean_t   b_lost = 0;                      /* Was there a packet lost ? */
571
572     ASSERT(p_input);
573     ASSERT(p_ts_packet);
574     ASSERT(p_es_descriptor);
575
576 #define p (p_ts_packet->buffer)
577
578 //    intf_DbgMsg("input debug: TS-demultiplexing packet %p, pid %d, number %d\n",
579 //                p_ts_packet, U16_AT(&p[1]) & 0x1fff, p[3] & 0x0f);
580
581 #ifdef STATS
582     p_es_descriptor->c_packets++;
583     p_es_descriptor->c_bytes += TS_PACKET_SIZE;
584 #endif
585
586     /* Extract flags values from TS common header. */
587     b_unit_start = (p[1] & 0x40);
588     b_adaption = (p[3] & 0x20);
589     b_payload = (p[3] & 0x10);
590     
591     /* Extract adaption field informations if any */
592     if( !b_adaption )
593     {
594         /* We don't have any adaptation_field, so payload start immediately
595          after the 4 byte TS header */
596         p_ts_packet->i_payload_start = 4;
597     }
598     else
599     {
600         /* p[4] is adaptation_field_length minus one */
601         p_ts_packet->i_payload_start = 5 + p[4];
602
603         /* The adaption field can be limited to the adaptation_field_length byte,
604            so that there is nothing to do: skip this possibility */
605         if( p[4] )
606         {
607             /* If the packet has both adaptation_field and payload, adaptation_field
608                cannot be more than 182 bytes long; if there is only an adaptation_field,
609                it must fill the next 183 bytes. */
610             if( b_payload ? (p[4] > 182) : (p[4] != 183) )
611             {
612                 intf_DbgMsg("input debug: invalid TS adaptation field (%p)\n",
613                             p_ts_packet);
614 #ifdef STATS
615                 p_es_descriptor->c_invalid_packets++;
616 #endif
617                 b_trash = 1;
618             }
619
620             /* No we are sure that the byte containing flags is present: read it */
621             else
622             {
623                 /* discontinuity_indicator */
624                 if( p[5] & 0x80 )
625                 {
626                     intf_DbgMsg("discontinuity_indicator encountered by TS demux " \
627                                 "(position read: %d, saved: %d)\n", p[5] & 0x80,
628                                 p_es_descriptor->i_continuity_counter);
629
630                     /* If the PID carries the PCR, there will be a system time-base
631                        discontinuity. We let the PCR decoder handle that. */
632                     p_es_descriptor->b_discontinuity = 1;
633                     
634                     /* There also may be a continuity_counter discontinuity: resynchronise
635                        our counter with the one of the stream */
636                     p_es_descriptor->i_continuity_counter = (p[3] & 0x0f) - 1;
637                 }
638
639                 /* random_access_indicator */
640                 p_es_descriptor->b_random |= p[5] & 0x40;
641
642                 /* If this is a PCR_PID, and this TS packet contains a PCR, we pass it
643                    along to the PCR decoder. */
644                 if( (p_es_descriptor->b_pcr) && (p[5] & 0x10) )
645                 {
646                     /* There should be a PCR field in the packet, check if the adaption
647                        field is long enough to carry it */
648                     if( p[4] >= 7 )
649                     {
650                         /* Call the PCR decoder */
651                         input_PcrDecode( p_input, p_es_descriptor, &p[6] );
652                     }
653                 }
654             }
655         }
656     }
657
658     /* Check the continuity of the stream. */
659     i_dummy = ((p[3] & 0x0f) - p_es_descriptor->i_continuity_counter) & 0x0f;
660     if( i_dummy == 1 )
661     {
662         /* Everything is ok, just increase our counter */
663         p_es_descriptor->i_continuity_counter++;
664     }
665     else
666     {
667         if( !b_payload && i_dummy == 0 )
668         {
669             /* This is a packet without payload, this is allowed by the draft
670                As there is nothing interessant in this packet (except PCR that
671                have already been handled), we can trash the packet. */
672             intf_DbgMsg("Packet without payload received by TS demux\n");
673             b_trash = 1;
674         }
675         else if( i_dummy <= 0 )
676         {
677             /* Duplicate packet: mark it as being to be trashed. */
678             intf_DbgMsg("Duplicate packet received by TS demux\n");
679             b_trash = 1;
680         }
681         else
682         {
683             /* This can indicate that we missed a packet or that the
684                continuity_counter wrapped and we received a dup packet: as we
685                don't know, do as if we missed a packet to be sure to recover
686                from this situation */
687             intf_DbgMsg("Packet lost by TS demux: current %d, packet %d\n",
688                         p_es_descriptor->i_continuity_counter & 0x0f,
689                         p[3] & 0x0f);
690             b_lost = 1;
691             p_es_descriptor->i_continuity_counter = p[3] & 0x0f;
692         }
693     }
694
695     /* Trash the packet if it has no payload or if it is bad */
696     if( b_trash )
697     {
698         input_NetlistFreeTS( p_input, p_ts_packet );
699 #ifdef STATS
700         p_input->c_ts_packets_trashed++;
701 #endif
702     }
703     else
704     {
705         if( p_es_descriptor->b_psi )
706         {
707             /* The payload contains PSI tables */
708             input_DemuxPSI( p_input, p_ts_packet, p_es_descriptor,
709                             b_unit_start, b_lost );
710         }
711         else
712         {
713             /* The payload carries a PES stream */ 
714             input_DemuxPES( p_input, p_ts_packet, p_es_descriptor,
715                             b_unit_start, b_lost );
716         }
717     }
718
719 #undef p
720 }
721
722
723
724
725 /*******************************************************************************
726  * input_DemuxPES: 
727  *******************************************************************************
728  * Gather a PES packet and analyzes its header.
729  *******************************************************************************/
730 static __inline__ void input_DemuxPES( input_thread_t *p_input,
731                                        ts_packet_t *p_ts_packet,
732                                        es_descriptor_t *p_es_descriptor,
733                                        boolean_t b_unit_start,
734                                        boolean_t b_packet_lost )
735 {
736     decoder_fifo_t *            p_fifo;
737     u8                          i_pes_header_size;
738     int                         i_dummy;
739     pes_packet_t*               p_last_pes;
740     ts_packet_t *               p_ts;
741     int                         i_ts_payload_size;
742     
743
744 #define p_pes (p_es_descriptor->p_pes_packet)
745
746     ASSERT(p_input);
747     ASSERT(p_ts_packet);
748     ASSERT(p_es_descriptor);
749
750 //    intf_DbgMsg("PES-demultiplexing %p (%p)\n", p_ts_packet, p_pes);
751
752     /* If we lost data, discard the PES packet we are trying to reassemble
753        if any and wait for the beginning of a new one in order to synchronise
754        again */
755     if( b_packet_lost && p_pes != NULL )
756     {
757         intf_DbgMsg("PES %p trashed because of packet lost\n", p_pes);
758         input_NetlistFreePES( p_input, p_pes );
759         p_pes = NULL;
760     }
761
762     /* If the TS packet contains the begining of a new PES packet, and if we
763        were reassembling a PES packet, then the PES should be complete now,
764        so parse its header and give it to the decoders */
765     if( b_unit_start && p_pes != NULL )
766     {
767 //        intf_DbgMsg("End of PES packet %p\n", p_pes);
768
769         /* Parse the header. The header has a variable length, but in order 
770            to improve the algorithm, we will read the 14 bytes we may be
771            interested in */
772         p_ts = p_pes->p_first_ts;
773         i_ts_payload_size = p_ts->i_payload_end - p_ts->i_payload_start;
774         i_dummy = 0;
775
776         if(i_ts_payload_size >= PES_HEADER_SIZE)
777         {
778             /* This part of the header entirely fits in the payload of   
779                the first TS packet */
780             p_pes->p_pes_header = &(p_ts->buffer[p_ts->i_payload_start]);
781         }
782         else
783         {
784             /* This part of the header does not fit in the current TS packet:
785                copy the part of the header we are interested in to the
786                p_pes_header_save buffer */
787             intf_DbgMsg("Code never tested encourtered, WARNING ! (benny)\n");
788             do
789             {
790                 memcpy(p_pes->p_pes_header_save + i_dummy,
791                        &p_ts->buffer[p_ts->i_payload_start], i_ts_payload_size);
792                 i_dummy += i_ts_payload_size;
793             
794                 p_ts = p_ts->p_next_ts;
795                 if(!p_ts)
796                 {
797                   /* The payload of the PES packet is shorter than the 14 bytes
798                      we would read. This means that high packet lost occured
799                      so the PES won't be usefull for any decoder. Moreover,
800                      this should never happen so we can trash the packet and
801                      exit roughly without regrets */
802                   intf_DbgMsg("PES packet too short: trashed\n");
803                   input_NetlistFreePES( p_input, p_pes );
804                   p_pes = NULL;
805                   /* Stats ?? */
806                   return;
807                 }
808                 
809                 i_ts_payload_size = p_ts->i_payload_end - p_ts->i_payload_start;
810             }
811             while(i_ts_payload_size + i_dummy < PES_HEADER_SIZE);
812
813             /* This last TS packet is partly header, partly payload, so just
814                copy the header part */
815             memcpy(p_pes->p_pes_header_save + i_dummy,
816                    &p_ts->buffer[p_ts->i_payload_start],
817                    PES_HEADER_SIZE - i_dummy);
818
819             /* The header must be read in the buffer not in any TS packet */
820            p_pes->p_pes_header = p_pes->p_pes_header_save;
821         }
822         
823         /* Now we have the part of the PES header we were interested in:
824            parse it */
825
826         /* First read the 6 header bytes common to all PES packets:
827            use them to test the PES validity */
828         if( (p_pes->p_pes_header[0] || p_pes->p_pes_header[1] ||
829             (p_pes->p_pes_header[2] != 1)) ||
830                                      /* packet_start_code_prefix != 0x000001 */
831             ((i_dummy = U16_AT(p_pes->p_pes_header + 4)) &&
832              (i_dummy + 6 != p_pes->i_pes_size)) )
833                    /* PES_packet_length is set and != total received payload */
834         {
835           /* Trash the packet and set p_pes to NULL to be sure the next PES
836              packet will have its b_data_lost flag set */
837           intf_DbgMsg("Corrupted PES packet received: trashed\n");
838           input_NetlistFreePES( p_input, p_pes );
839           p_pes = NULL;
840           /* Stats ?? */
841         }
842         else
843         {
844             /* The PES packet is valid. Check its type to test if it may
845                carry additional informations in a header extension */
846             p_pes->i_stream_id =  p_pes->p_pes_header[3];
847
848             switch( p_pes->i_stream_id )
849             {
850             case 0xBE:  /* Padding */
851             case 0xBC:  /* Program stream map */
852             case 0xBF:  /* Private stream 2 */
853             case 0xB0:  /* ECM */
854             case 0xB1:  /* EMM */
855             case 0xFF:  /* Program stream directory */
856             case 0xF2:  /* DSMCC stream */
857             case 0xF8:  /* ITU-T H.222.1 type E stream */
858                 /* The payload begins immediatly after the 6 bytes header, so
859                    we have finished with the parsing */
860                 i_pes_header_size = 6;
861                 break;
862
863             default:
864                 /* The PES header contains at least 3 more bytes: parse them */
865                 p_pes->b_data_alignment = p_pes->p_pes_header[6] & 0x10;
866                 p_pes->b_has_pts = p_pes->p_pes_header[7] & 0x4;
867                 i_pes_header_size = 9 + p_pes->p_pes_header[8];
868                 
869                 /* Now parse the optional header extensions (in the limit of
870                    the 14 bytes */
871                 if( p_pes->b_has_pts )
872                 {
873                     pcr_descriptor_t *p_pcr;
874                     /* The PTS field is split in 3 bit records. We have to add
875                        them, and thereafter we substract the 2 marker_bits */
876
877                     p_pcr = p_input->p_pcr;
878                     pthread_mutex_lock( &p_pcr->lock );
879                     if( p_pcr->delta_clock == 0 )
880                     {
881                         p_pes->i_pts = 0;
882                     }
883                     else
884                     {
885                         p_pes->i_pts = ( ((s64)p_pes->p_pes_header[9] << 29) +
886                                          ((s64)U16_AT(p_pes->p_pes_header + 10) << 14) +
887                                          ((s64)U16_AT(p_pes->p_pes_header + 12) >> 1) -
888                                          (1 << 14) - (1 << 29) );
889                         p_pes->i_pts *= 300;
890                         p_pes->i_pts /= 27;
891                         p_pes->i_pts += p_pcr->delta_clock;
892                         if( p_pcr->c_pts == 0 )
893                         {
894                             p_pcr->delta_decode = mdate() - p_pes->i_pts + 500000;
895                         }
896                         p_pes->i_pts += p_pcr->delta_decode;
897                     }
898                     p_pcr->c_pts += 1;
899                     pthread_mutex_unlock( &p_pcr->lock );
900                 }
901                 break;
902             }
903
904             /* Now we've parsed the header, we just have to indicate in some
905                specific TS packets where the PES payload begins (renumber
906                i_payload_start), so that the decoders can find the beginning
907                of their data right out of the box. */
908             p_ts = p_pes->p_first_ts;
909             i_ts_payload_size = p_ts->i_payload_end - p_ts->i_payload_start;
910             while( i_pes_header_size > i_ts_payload_size )
911             {
912                 /* These packets are entirely filled by the PES header. */
913                 i_pes_header_size -= i_ts_payload_size;
914                 p_ts->i_payload_start = p_ts->i_payload_end;
915                 /* Go to the next TS packet: here we won't have to test it is
916                    not NULL because we trash the PES packets when packet lost
917                    occurs */
918                 p_ts = p_ts->p_next_ts;
919                 i_ts_payload_size = p_ts->i_payload_end - p_ts->i_payload_start;
920             }
921             /* This last packet is partly header, partly payload. */
922             p_ts->i_payload_start += i_pes_header_size;
923
924             /* Now we can eventually put the PES packet in the decoder's
925                PES fifo */
926             switch( p_es_descriptor->i_type )
927             {
928             case MPEG1_VIDEO_ES:
929             case MPEG2_VIDEO_ES:
930                 p_fifo = &(((vdec_thread_t*)(p_es_descriptor->p_dec))->fifo);
931                 break;
932             case MPEG1_AUDIO_ES:
933             case MPEG2_AUDIO_ES:
934                 p_fifo = &(((adec_thread_t*)(p_es_descriptor->p_dec))->fifo);
935                 break;
936             default:
937                 /* This should never happen. */
938                 intf_DbgMsg("Unknown stream type (%d, %d): PES trashed\n",
939                             p_es_descriptor->i_id, p_es_descriptor->i_type);
940                 p_fifo = NULL;
941                 break;
942             }
943
944             if( p_fifo != NULL )
945             {
946                 pthread_mutex_lock( &p_fifo->data_lock );
947                 if( DECODER_FIFO_ISFULL( *p_fifo ) )
948                 {
949                     /* The FIFO is full !!! This should not happen. */
950 #ifdef STATS
951                     p_input->c_ts_packets_trashed += p_pes->i_ts_packets;
952                     p_es_descriptor->c_invalid_packets += p_pes->i_ts_packets;
953 #endif
954                     input_NetlistFreePES( p_input, p_pes );
955                     intf_DbgMsg("PES trashed - fifo full ! (%d, %d)\n",
956                                p_es_descriptor->i_id, p_es_descriptor->i_type);
957                 }
958                 else
959                 {
960 //                    intf_DbgMsg("Putting %p into fifo %p/%d\n",
961 //                                p_pes, p_fifo, p_fifo->i_end);
962                     p_fifo->buffer[p_fifo->i_end] = p_pes;
963                     DECODER_FIFO_INCEND( *p_fifo );
964
965                     /* Warn the decoder that it's got work to do. */
966                     pthread_cond_signal( &p_fifo->data_wait );
967                 }
968                 pthread_mutex_unlock( &p_fifo->data_lock );
969             }
970             else
971             {
972                 intf_DbgMsg("No fifo to receive PES %p: trash\n", p_pes);
973 #ifdef STATS
974                 p_input->c_ts_packets_trashed += p_pes->i_ts_packets;
975                 p_es_descriptor->c_invalid_packets += p_pes->i_ts_packets;
976 #endif
977                 input_NetlistFreePES( p_input, p_pes );
978             }
979         }
980     }
981
982
983     /* If we are at the beginning of a new PES packet, we must fetch a new
984        PES buffer to begin with the reassembly of this PES packet. This is
985        also here that we can synchronise with the stream if we we lost
986        packets or if the decoder has just started */ 
987     if( b_unit_start )
988     {
989         p_last_pes = p_pes;
990
991         /* Get a new one PES from the PES netlist. */
992         if( (p_pes = input_NetlistGetPES( p_input )) == (NULL) )
993         {
994             /* PES netlist is empty ! */
995             p_input->b_error = 1;
996         }
997         else
998         {
999 //           intf_DbgMsg("New PES packet %p (first TS: %p)\n", p_pes, p_ts_packet);
1000
1001             /* Init the PES fields so that the first TS packet could be correctly
1002                added to the PES packet (see below) */
1003             p_pes->p_first_ts = p_ts_packet;
1004             p_pes->p_last_ts = NULL;
1005
1006             /* If the last pes packet was null, this means that the synchronisation
1007                was lost and so warn the decoder that he will have to find a way to
1008                recover */
1009             if( !p_last_pes )
1010                 p_pes->b_data_loss = 1;
1011
1012             /* Read the b_random_access flag status and then reinit it */     
1013             p_pes->b_random_access = p_es_descriptor->b_random;
1014             p_es_descriptor->b_random = 0;
1015         }
1016     }
1017
1018
1019     /* If we are synchronised with the stream, and so if we are ready to
1020        receive correctly the data, add the TS packet to the current PES
1021        packet */
1022     if( p_pes != NULL )
1023     {
1024 //      intf_DbgMsg("Adding TS %p to PES %p\n", p_ts_packet, p_pes);
1025
1026         /* Size of the payload carried in the TS packet */
1027         i_ts_payload_size = p_ts_packet->i_payload_end -
1028                             p_ts_packet->i_payload_start;
1029
1030         /* Update the relations between the TS packets */
1031         p_ts_packet->p_prev_ts = p_pes->p_last_ts;
1032         p_ts_packet->p_next_ts = NULL;
1033         if( p_pes->i_ts_packets != 0 )
1034         {
1035             /* Regarder si il serait pas plus efficace de ne creer que les liens
1036                precedent->suivant pour le moment, et les liens suivant->precedent
1037                quand le paquet est termine */
1038             /* Otherwise it is the first TS packet. */
1039             p_pes->p_last_ts->p_next_ts = p_ts_packet;
1040         }
1041         /* Now add the TS to the PES packet */
1042         p_pes->p_last_ts = p_ts_packet;
1043         p_pes->i_ts_packets++;
1044         p_pes->i_pes_size += i_ts_payload_size;
1045
1046         /* Stats */
1047 #ifdef STATS
1048         i_dummy = p_ts_packet->i_payload_end - p_ts_packet->i_payload_start;
1049         p_es_descriptor->c_payload_bytes += i_dummy;
1050 #endif
1051     }
1052     else
1053     {
1054         /* Since we don't use the TS packet to build a PES packet, we don't
1055            need it anymore, so give it back to the netlist */
1056 //        intf_DbgMsg("Trashing TS %p: no PES being build\n", p_ts_packet);
1057         input_NetlistFreeTS( p_input, p_ts_packet );     
1058     }
1059     
1060 #undef p_pes
1061 }
1062
1063
1064
1065
1066 /*******************************************************************************
1067  * input_DemuxPSI:
1068  *******************************************************************************
1069  * Notice that current ES state has been locked by input_SortPacket. (No more true,
1070  * changed by benny - See if it'a ok, and definitely change the code ???????? )
1071  *******************************************************************************/
1072 static __inline__ void input_DemuxPSI( input_thread_t *p_input,
1073                                        ts_packet_t *p_ts_packet,
1074                                        es_descriptor_t *p_es_descriptor,
1075                                        boolean_t b_unit_start, boolean_t b_packet_lost )
1076 {
1077     int i_data_offset;      /* Offset of the interesting data in the TS packet */
1078     u16 i_data_length;                                 /* Length of those data */
1079     boolean_t b_first_section; /* Was there another section in the TS packet ? */
1080     
1081     ASSERT(p_input);
1082     ASSERT(p_ts_packet);
1083     ASSERT(p_es_descriptor);
1084
1085 #define p_psi (p_es_descriptor->p_psi_section)
1086
1087 //    intf_DbgMsg( "input debug: PSI demultiplexing %p (%p)\n", p_ts_packet, p_input);
1088
1089 //    intf_DbgMsg( "Packet: %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x %x (unit start: %d)\n", p_ts_packet->buffer[p_ts_packet->i_payload_start], p_ts_packet->buffer[p_ts_packet->i_payload_start+1], p_ts_packet->buffer[p_ts_packet->i_payload_start+2], p_ts_packet->buffer[p_ts_packet->i_payload_start+3], p_ts_packet->buffer[p_ts_packet->i_payload_start+4], p_ts_packet->buffer[p_ts_packet->i_payload_start+5], p_ts_packet->buffer[p_ts_packet->i_payload_start+6], p_ts_packet->buffer[p_ts_packet->i_payload_start+7], p_ts_packet->buffer[p_ts_packet->i_payload_start+8], p_ts_packet->buffer[p_ts_packet->i_payload_start+9], p_ts_packet->buffer[p_ts_packet->i_payload_start+10], p_ts_packet->buffer[p_ts_packet->i_payload_start+11], p_ts_packet->buffer[p_ts_packet->i_payload_start+12], p_ts_packet->buffer[p_ts_packet->i_payload_start+13], p_ts_packet->buffer[p_ts_packet->i_payload_start+14], p_ts_packet->buffer[p_ts_packet->i_payload_start+15], p_ts_packet->buffer[p_ts_packet->i_payload_start+16], p_ts_packet->buffer[p_ts_packet->i_payload_start+17], p_ts_packet->buffer[p_ts_packet->i_payload_start+18], p_ts_packet->buffer[p_ts_packet->i_payload_start+19], p_ts_packet->buffer[p_ts_packet->i_payload_start+20], b_unit_start);
1090
1091     /* The section we will deal with during the first iteration of the following
1092        loop is the first one contained in the TS packet */
1093     b_first_section = 1;
1094
1095     /* Reassemble the pieces of sections contained in the TS packet and decode
1096        the sections that could have been completed */
1097     do
1098     {
1099         /* Has the reassembly of a section already began in a previous packet ? */
1100         if( p_psi->b_running_section )
1101         {
1102             /* Was data lost since the last TS packet ? */
1103             if( b_packet_lost )
1104             {
1105                 /* Discard the section and wait for the begining of a new one to resynch */
1106                 p_psi->b_running_section = 0;
1107                 intf_DbgMsg( "Section discarded due to packet loss\n" );
1108             }
1109             else
1110             {
1111                 /* The data that complete a previously began section are always at
1112                    the beginning of the TS payload... */
1113                 i_data_offset = p_ts_packet->i_payload_start;
1114                 /* ...Unless there is a pointer field, that we have to bypass */
1115                 if( b_unit_start )
1116                     i_data_offset ++;
1117 //                intf_DbgMsg( "New part of the section received at offset %d\n", i_data_offset );
1118             }
1119         }
1120         /* We are looking for the beginning of a new section */
1121         else
1122         {
1123             if( !b_unit_start )
1124             {
1125                 /* Cannot do anything with those data: trash both PSI section and TS packet */
1126                 p_psi->b_running_section = 0;
1127                 break;
1128             }
1129             else
1130             {
1131                 /* Get the offset at which the data for that section can be found */
1132                 if( b_first_section )
1133                 {
1134                     /* The offset is stored in the pointer_field since we are interested in
1135                        the first section of the TS packet. Note that the +1 is to bypass
1136                        the pointer field */
1137                     i_data_offset = p_ts_packet->i_payload_start +
1138                      p_ts_packet->buffer[p_ts_packet->i_payload_start] + 1;
1139                 }
1140                 else
1141                 {
1142                     /* Since no gap is allowed between 2 sections in a TS packet, the
1143                        offset is given by the end of the previous section. In fact, there
1144                        is nothing to do, i_offset was set to the right value in the
1145                        previous iteration */
1146                 }
1147 //                intf_DbgMsg( "New section beginning at offset %d in TS packet\n", i_data_offset );
1148
1149                 /* Read the length of that section */
1150                 p_psi->i_length = (U16_AT(&p_ts_packet->buffer[i_data_offset+1]) & 0xFFF) + 3;
1151 //                intf_DbgMsg( "Section length %d\n", p_psi->i_length );
1152                 if( p_psi->i_length > PSI_SECTION_SIZE )
1153                 {
1154                   /* The TS packet is corrupted, stop here to avoid possible a seg fault */
1155                   intf_DbgMsg( "Section size is too big, aborting its reception\n" );
1156                   break;
1157                 }
1158
1159                 /* Init the reassembly of that section */
1160                 p_psi->b_running_section = 1;
1161                 p_psi->i_current_position = 0;
1162             }
1163         }
1164
1165         /* Compute the length of data related to the section in this TS packet */
1166         if( p_psi->i_length - p_psi->i_current_position > TS_PACKET_SIZE - i_data_offset)
1167             i_data_length = TS_PACKET_SIZE - i_data_offset;
1168         else
1169           i_data_length = p_psi->i_length - p_psi->i_current_position;
1170
1171         /* Copy those data in the section buffer */
1172         memcpy( &p_psi->buffer[p_psi->i_current_position], &p_ts_packet->buffer[i_data_offset],
1173                 i_data_length );
1174     
1175         /* Interesting data are now after the ones we copied */
1176         i_data_offset += i_data_length;
1177
1178         /* Decode the packet if it is now complete */
1179         if (p_psi->i_length == p_psi->i_current_position + i_data_length)
1180         {
1181             /* Packet is complete, decode it */
1182 //            intf_DbgMsg( "SECTION COMPLETE: starting decoding of its data\n" );
1183             input_PsiDecode( p_input, p_psi );
1184
1185             /* Prepare the buffer to receive a new section */
1186             p_psi->i_current_position = 0;
1187             p_psi->b_running_section = 0;
1188         
1189             /* The new section won't be the first anymore */
1190             b_first_section = 0;
1191         }
1192         else
1193         {
1194             /* Prepare the buffer to receive the next part of the section */
1195           p_psi->i_current_position += i_data_length;
1196 //          intf_DbgMsg( "Section not complete, waiting for the end\n" );
1197         }
1198     
1199 //        intf_DbgMsg( "Must loop ? Next data offset: %d, stuffing: %d\n",
1200 //                     i_data_offset, p_ts_packet->buffer[i_data_offset] );
1201     }
1202     /* Stop if we reached the end of the packet or stuffing bytes */
1203     while( i_data_offset < TS_PACKET_SIZE && p_ts_packet->buffer[i_data_offset] != 0xFF );
1204
1205     /* Relase the TS packet, we don't need it anymore */
1206     input_NetlistFreeTS( p_input, p_ts_packet );
1207
1208 #undef p_psi  
1209 }