]> git.sesse.net Git - vlc/commitdiff
RTP reordering in module/access/udp.c done by me with help from Marian Durkovic ...
authorJean-Paul Saman <jpsaman@videolan.org>
Sat, 15 Oct 2005 22:07:55 +0000 (22:07 +0000)
committerJean-Paul Saman <jpsaman@videolan.org>
Sat, 15 Oct 2005 22:07:55 +0000 (22:07 +0000)
modules/access/udp.c
src/input/stream.c

index 916dd4749058c14f770bd9916b02d7ea0f57f715..1ae4badaf91a48a4a8b9063d59763999f329dc8c 100644 (file)
@@ -1,7 +1,7 @@
 /*****************************************************************************
  * udp.c: raw UDP & RTP input module
  *****************************************************************************
- * Copyright (C) 2001-2004 the VideoLAN team
+ * Copyright (C) 2001-2005 the VideoLAN team
  * $Id$
  *
  * Authors: Christophe Massiot <massiot@via.ecp.fr>
 #define AUTO_MTU_LONGTEXT N_( \
     "Allows growing the MTU if truncated packets are found" )
 
+#define RTP_LATE_TEXT N_("Reorder timeout in ms for late RTP packets")
+#define RTP_LATE_LONGTEXT N_( \
+    "Allows you to modify the RTP packets reorder and late behaviour. " \
+    "If enabled (value>0) then out-of-order packets will be held for the " \
+    "specified timeout in ms. " \
+    "The default behaviour is not to reorder." )
+
 static int  Open ( vlc_object_t * );
 static void Close( vlc_object_t * );
 
@@ -62,6 +69,8 @@ vlc_module_begin();
     add_bool( "udp-auto-mtu", 1, NULL,
               AUTO_MTU_TEXT, AUTO_MTU_LONGTEXT, VLC_TRUE );
 
+    add_integer( "rtp-late", 0, NULL, RTP_LATE_TEXT, RTP_LATE_LONGTEXT, VLC_TRUE );
+
     set_capability( "access2", 0 );
     add_shortcut( "udp" );
     add_shortcut( "udpstream" );
@@ -77,6 +86,7 @@ vlc_module_end();
  * Local prototypes
  *****************************************************************************/
 #define RTP_HEADER_LEN 12
+#define RTP_SEQ_NUM_SIZE 65536
 
 static block_t *BlockUDP( access_t * );
 static block_t *BlockRTP( access_t * );
@@ -91,7 +101,17 @@ struct access_sys_t
     vlc_bool_t b_auto_mtu;
 
     /* rtp only */
-    int i_sequence_number;
+    uint16_t i_sequence_number;
+    vlc_bool_t b_first_seqno;
+
+    /* reorder rtp packets when out-of-bounds 
+     * the packets hold queue is one level deep
+     */
+    uint32_t i_rtp_late; /* number of ms an RTP packet may be too late*/
+    uint32_t i_last_pcr; /* last known good PCR */
+    block_t *p_list;     /* list of packets to rearrange */
+    block_t *p_end; /* last packet in p_list */
+    block_t *p_next;     /* p_next ?? */
 };
 
 /*****************************************************************************
@@ -224,7 +244,15 @@ static int Open( vlc_object_t *p_this )
     var_Create( p_access, "udp-caching", VLC_VAR_INTEGER | VLC_VAR_DOINHERIT );
 
     /* Keep track of RTP sequence number */
-    p_sys->i_sequence_number = -1;
+    p_sys->i_sequence_number = 0;
+    p_sys->b_first_seqno = VLC_TRUE;
+
+    /* RTP reordering out-of-bound packets */
+    p_sys->i_last_pcr = 0;
+    p_sys->i_rtp_late = var_CreateGetInteger( p_access, "rtp-late" );
+    p_sys->p_list = NULL;
+    p_sys->p_end = NULL;
+    p_sys->p_next = NULL;
 
     return VLC_SUCCESS;
 }
@@ -237,6 +265,7 @@ static void Close( vlc_object_t *p_this )
     access_t     *p_access = (access_t*)p_this;
     access_sys_t *p_sys = p_access->p_sys;
 
+    block_ChainRelease( p_sys->p_list );
     net_Close( p_sys->fd );
     free( p_sys );
 }
@@ -318,91 +347,363 @@ static block_t *BlockUDP( access_t *p_access )
     return p_block;
 }
 
+/*
+ * rtp_ChainInsert - insert a p_block in the chain and
+ * look at the sequence numbers.
+ */
+static inline void rtp_ChainInsert( access_t *p_access, block_t **pp_list, block_t **pp_end, block_t *p_block )
+{
+    block_t *p_tmp = NULL;
+    block_t *p = NULL;
+    uint16_t i_new = 0;
+    uint16_t i_cur = 0;
+    uint16_t i_expected = 0;
+    uint32_t i_pcr_new = 0;
+
+    if( *pp_list == NULL )
+    {
+        *pp_list = p_block;
+        *pp_end  = p_block;
+        return;
+    }
+    /* Appending packets at the end of the chain is the normal case */
+    i_pcr_new = ( (p_block->p_buffer[4] << 24) +
+                  (p_block->p_buffer[5] << 16) +
+                  (p_block->p_buffer[6] << 8) +
+                   p_block->p_buffer[7] );
+    i_new = ( (p_block->p_buffer[2] << 8 ) + p_block->p_buffer[3] );
+
+    p = *pp_end;
+    i_cur = ( (p->p_buffer[2] << 8 ) + p->p_buffer[3] );
+    i_expected = ((i_cur+1) % RTP_SEQ_NUM_SIZE);
+
+    if( i_new == i_expected ) /* Append at the end? */
+    {
+        msg_Dbg( p_access, ">> append %p(%u)==%p(%u)\n", p_block, i_cur, p, i_new );
+        p->p_next = *pp_end = p_block;
+        return;
+    }
+    /* Add to the fron fo the chain? */
+    p = *pp_list;
+    i_new = ( (p_block->p_buffer[2] << 8 ) + p_block->p_buffer[3] );
+    i_cur = ( (p->p_buffer[2] << 8 ) + p->p_buffer[3] );
+    i_expected = ((i_cur+1) % RTP_SEQ_NUM_SIZE);
+    if( i_expected > i_new )
+    {
+        msg_Dbg( p_access, ">> prepend %p(%u)==%p(%u)\n", p_block, i_cur, p, i_new );
+        p_block->p_next = p;
+        *pp_list = p_block;
+        return;
+    }
+    /* The packet can't be added to the front or the end of the chain,
+     * thus walk the chain from the start.
+     */
+    while( p->p_next )
+    {
+        i_cur = (p->p_buffer[2] << 8 ) + p->p_buffer[3];
+        i_expected = (i_cur+1) % RTP_SEQ_NUM_SIZE;
+
+        msg_Dbg( p_access,  "i_cur: %u, i_new: %u", i_cur, i_new);
+        if( i_cur == i_new )
+        {
+            uint32_t i_pcr_cur = ( (p->p_buffer[4] << 24) +
+                                   (p->p_buffer[5] << 16) +
+                                   (p->p_buffer[6] << 8) +
+                                    p->p_buffer[7] );
+            /* This packet might be a duplicate, so check PCR's */
+            if( i_pcr_cur >= i_pcr_new )
+            {
+                /* packet way too late drop it. */
+                block_Release( p_block );
+                return;
+            }
+            /* Add it to list later on
+             * else if( i_pcr_cur < i_pcr_new ) */
+            break;
+        }
+        else if( i_expected == i_new ) /* insert in chain */
+        {
+            p_tmp = p->p_next;
+            msg_Dbg( p_access, ">> insert between %p(%u)==%p(%u)", p, i_cur, p_tmp, i_new );
+            p->p_next = p_block;
+            p_block->p_next = p_tmp;
+            return;
+        }
+        if( !p->p_next ) break;
+        p = p->p_next;
+    }
+}
+
+/*
+ * rtp_ChainSend - look which packets are ready for sending.
+ */
+static inline block_t *rtp_ChainSend( access_t *p_access, block_t **pp_list, uint16_t i_seq )
+{
+    access_sys_t *p_sys = (access_sys_t *) p_access->p_sys;
+    uint16_t i_cur = 0;
+
+    if( *pp_list )
+    {
+        /* Parse RTP header */
+        int i_skip = 0;
+        int i_extension_bit = 0;
+        int i_extension_length = 0;
+        int i_CSRC_count = 0;
+        /* Data pointers */
+        block_t *p_prev = NULL;
+        block_t *p_send = *pp_list;
+        block_t *p = *pp_list;
+
+        while( p->p_next )
+        {
+            i_cur = ( (p->p_buffer[2] << 8 ) + p->p_buffer[3] );
+            msg_Dbg( p_access, "rtp_ChainSend: i_cur %u, i_seq %u", i_cur, i_seq );
+            if( i_cur == i_seq )
+            {
+                /* sent all packets that are received in order */
+                i_seq++;
+
+                i_CSRC_count = p->p_buffer[0] & 0x0F;
+                i_extension_bit  = ( p->p_buffer[0] & 0x10 ) >> 4;
+                if ( i_extension_bit == 1)
+                    i_extension_length = ( (p->p_buffer[14] << 8 ) +
+                                            p->p_buffer[15] );
+
+                /* Skip header + CSRC extension field n*(32 bits) + extention */
+                i_skip = RTP_HEADER_LEN + 4*i_CSRC_count + i_extension_length;
+
+                /* Return the packet without the RTP header. */
+                p->i_buffer -= i_skip;
+                p->p_buffer += i_skip;
+            }
+            else if( i_cur > i_seq )
+            {
+                if( p_prev )
+                {
+                    *pp_list = p;
+                    p_prev->p_next = NULL;
+                    p_sys->i_last_pcr = ( (p_prev->p_buffer[4] << 24) +
+                                          (p_prev->p_buffer[5] << 16) +
+                                          (p_prev->p_buffer[6] << 8) +
+                                           p_prev->p_buffer[7] );
+                    p_sys->i_sequence_number = ( (p_prev->p_buffer[2] << 8 ) +
+                                                  p_prev->p_buffer[3] );
+                    return p_send;
+                }
+                /* FiXME: or should we return NULL here? */
+                return NULL;
+            }
+            p_prev = p;
+            if (!p->p_next) break;
+            p = p->p_next;
+        }
+        /* We have walked through the complete chain and all packets are
+         * in sequence - so send the whole chain
+         */
+        i_CSRC_count = p->p_buffer[0] & 0x0F;
+        i_extension_bit  = ( p->p_buffer[0] & 0x10 ) >> 4;
+        if( i_extension_bit == 1)
+            i_extension_length = ( (p->p_buffer[14] << 8 ) + p->p_buffer[15] );
+
+        /* Skip header + CSRC extension field n*(32 bits) + extention */
+        i_skip = RTP_HEADER_LEN + 4*i_CSRC_count + i_extension_length;
+
+        /* Update the list pointers */
+        *pp_list = NULL;
+        p_sys->p_next = NULL;
+        p_sys->p_end = NULL;
+        p_sys->i_sequence_number = ( (p->p_buffer[2] << 8 ) +
+                                      p->p_buffer[3] );
+        p_sys->i_last_pcr = ( (p->p_buffer[4] << 24) +
+                              (p->p_buffer[5] << 16) +
+                              (p->p_buffer[6] << 8) +
+                               p->p_buffer[7] );
+        /* Return the packet without the RTP header. */
+        p->i_buffer -= i_skip;
+        p->p_buffer += i_skip;
+        return p_send;
+    }
+    return NULL;
+}
+
 /*****************************************************************************
  * BlockParseRTP/BlockRTP:
  *****************************************************************************/
 static block_t *BlockParseRTP( access_t *p_access, block_t *p_block )
 {
-    int     i_rtp_version;
-    int     i_CSRC_count;
-    int     i_payload_type;
-    int     i_skip = 0;
-    int     i_sequence_number = 0;
-    int     i_extention_flag;
-    int     i_extention_length = 0;
-    int     i_sequence_expected;
+    access_sys_t *p_sys = (access_sys_t *) p_access->p_sys;
+    int      i_rtp_version;
+    int      i_CSRC_count;
+    int      i_payload_type;
+    int      i_skip = 0;
+    uint16_t i_sequence_number = 0;
+    uint16_t i_sequence_expected = 0;
+    int      i_extension_bit = 0;
+    int      i_extension_length = 0;
+    uint32_t i_pcr = 0;
 
     if( p_block == NULL )
         return NULL;
 
     if( p_block->i_buffer < RTP_HEADER_LEN )
-        goto trash;
-
+    {
+        msg_Warn( p_access, "received a too short packet for RTP" );
+        block_Release( p_block );
+        return NULL;
+    }
     /* Parse the header and make some verifications.
-     * See RFC 1889 & RFC 2250. */
+     * See RFC 3550. */
     i_rtp_version     = ( p_block->p_buffer[0] & 0xC0 ) >> 6;
     i_CSRC_count      = p_block->p_buffer[0] & 0x0F;
-    i_extention_flag  = p_block->p_buffer[0] & 0x10;
     i_payload_type    = p_block->p_buffer[1] & 0x7F;
-    i_sequence_number = (p_block->p_buffer[2] << 8 ) + p_block->p_buffer[3];
+    i_sequence_number = (p_block->p_buffer[2] << 8) +
+                         p_block->p_buffer[3];
+    i_pcr = ( (p_block->p_buffer[4] << 24) +
+              (p_block->p_buffer[5] << 16) +
+              (p_block->p_buffer[6] << 8) +
+               p_block->p_buffer[7] );
+    i_extension_bit  = ( p_block->p_buffer[0] & 0x10 ) >> 4;
 
-    if ( i_rtp_version != 2 )
+    if( i_rtp_version != 2 )
         msg_Dbg( p_access, "RTP version is %u, should be 2", i_rtp_version );
 
     if( i_payload_type == 14 )
         i_skip = 4;
     else if( i_payload_type !=  33 && i_payload_type != 32 )
         msg_Dbg( p_access, "unsupported RTP payload type (%u)", i_payload_type );
-    if( i_extention_flag )
-        i_extention_length = 4 +
+
+    if( i_extension_bit == 1)
+        i_extension_length = 4 +
             4 * ( (p_block->p_buffer[14] << 8) + p_block->p_buffer[15] );
 
     /* Skip header + CSRC extension field n*(32 bits) + extention */
-    i_skip += RTP_HEADER_LEN + 4*i_CSRC_count + i_extention_length;
-
+    i_skip += RTP_HEADER_LEN + 4*i_CSRC_count + i_extension_length;
     if( i_skip >= p_block->i_buffer )
-        goto trash;
-
-    /* Return the packet without the RTP header. */
-    p_block->i_buffer -= i_skip;
-    p_block->p_buffer += i_skip;
-
-#define RTP_SEQ_NUM_SIZE 65536
-#define RTP_SEQ_MAX_NO_DELTA 50
-    /* Detect RTP packet loss through tracking sequence numbers.
-     * See RFC 1889. */
-    if( p_access->p_sys->i_sequence_number == -1 )
-        p_access->p_sys->i_sequence_number =
-                (i_sequence_number - 1 + RTP_SEQ_NUM_SIZE ) % RTP_SEQ_NUM_SIZE;
-
-    i_sequence_expected = (p_access->p_sys->i_sequence_number + 1) % RTP_SEQ_NUM_SIZE;
+    {
+        msg_Warn( p_access, "received a too short packet for RTP" );
+        block_Release( p_block );
+        return NULL;
+    }
 
+    /* Detect RTP packet loss through tracking sequence numbers,
+     * and take RTP PCR into account.
+     * See RFC 3550.
+     */
+    if( p_sys->b_first_seqno )
+    {
+        p_sys->i_sequence_number = i_sequence_number;
+        p_sys->i_last_pcr = i_pcr;
+        p_sys->b_first_seqno = VLC_FALSE;
+    }
+#if 0
+    /* Emulate packet loss */
+    if ( (i_sequence_number % 4000) == 0)
+    {
+        msg_Warn( p_access, "Emulating packet drop" );
+        block_Release( p_block );
+        return NULL;
+    }
+#endif
+    i_sequence_expected = ((p_sys->i_sequence_number + 1) % RTP_SEQ_NUM_SIZE);
     if( i_sequence_expected != i_sequence_number )
     {
-        if( ((p_access->p_sys->i_sequence_number - i_sequence_number + RTP_SEQ_NUM_SIZE) % RTP_SEQ_NUM_SIZE) < RTP_SEQ_MAX_NO_DELTA )
+        /* Handle out of order packets */
+        if( p_sys->i_rtp_late > 0 )
         {
-            msg_Warn( p_access, "Trashing reordered/duplicate RTP packet, expected sequence number %d got %d",
-                      i_sequence_expected, i_sequence_number );
-            block_Release( p_block );
-            return NULL;
-        }
+            if( i_sequence_expected < i_sequence_number )
+            {
+                msg_Warn( p_access,
+                    "RTP packet out of order (too early) expected %u, current %u",
+                    i_sequence_expected, i_sequence_number );
+                if( (i_pcr - p_sys->i_last_pcr) > (p_sys->i_rtp_late*90) )
+                {
+                    /* Gap too big, we have been holding this data for too long,
+                     * send what we have.
+                     */
+                    msg_Warn( p_access,
+                        "Gap too big resyncing: delta %u, held for %d ms",
+                        (i_pcr - p_sys->i_last_pcr), p_sys->i_rtp_late );
+                    rtp_ChainInsert( p_access, &p_sys->p_list, &p_sys->p_end, p_block );
+                    return rtp_ChainSend( p_access, &p_sys->p_list, i_sequence_expected );
+                }
+                /* hold packets that arrive too early. */
+                rtp_ChainInsert( p_access, &p_sys->p_list, &p_sys->p_end, p_block );
+                return rtp_ChainSend( p_access, &p_sys->p_list, p_sys->i_sequence_number );
+            }
+            else if( /* (i_sequence_expected > i_sequence_number ) && */
+                     (i_pcr <= p_sys->i_last_pcr) )
+            {
+                msg_Warn( p_access,
+                    "RTP packet out of order (duplicate or too late) expected %u, current %u .. trashing it",
+                    i_sequence_expected, i_sequence_number );
+                block_Release( p_block );
+                p_sys->i_sequence_number = i_sequence_number;
+                p_sys->i_last_pcr = i_pcr;
+                return NULL;
+            }
 
+            if( p_sys->p_list )
+            {
+                block_t *p = NULL;
+                block_t **p_send = &p_sys->p_list;
+
+                msg_Warn( p_access,
+                    "RTP packet (unexpected condition) expected %u, current %u",
+                    i_sequence_expected, i_sequence_number );
+
+                /* Append block to the end of chain and send whole chain */
+                block_ChainLastAppend( &p_send, p_block );
+                p_sys->p_list = p_sys->p_end = NULL;
+                p_sys->i_sequence_number = i_sequence_number;
+                p_sys->i_last_pcr = i_pcr;
+
+                /* Return the packet without the RTP header. */
+                p = *p_send;
+                while( p->p_next )
+                {
+                    p->i_buffer -= i_skip;
+                    p->p_buffer += i_skip;
+                    p = p->p_next;
+                }
+                return *p_send;
+            }
+            /* This code should never be reached !! */
+            msg_Err( p_access,
+                "Bug in algorithme: (unexpected condition) expected %u (pcr=%u), current %u (pcr=%u)",
+                i_sequence_expected, i_sequence_number, p_sys->i_last_pcr, i_pcr );
+        }
         msg_Warn( p_access,
                   "RTP packet(s) lost, expected sequence number %d got %d",
                   i_sequence_expected, i_sequence_number );
+
         /* Mark transport error in the first TS packet in the RTP stream. */
-        if( i_payload_type == 33 && p_block->p_buffer[0] == 0x47 )
+        if( (i_payload_type == 33) && (p_block->p_buffer[0] == 0x47) )
             p_block->p_buffer[1] |= 0x80;
     }
-    p_access->p_sys->i_sequence_number = i_sequence_number;
-#undef RTP_SEQ_MAX_NO_DELTA
-#undef RTP_SEQ_NUM_SIZE
-    return p_block;
+    else if( (p_sys->i_rtp_late > 0) && p_sys->p_list )
+    {
+        if( i_pcr <= p_sys->i_last_pcr )
+        {
+            msg_Warn( p_access,
+                "RTP packet out of order (duplicate) expected %u, current %u .. trashing it",
+                i_sequence_expected, i_sequence_number );
+            block_Release( p_block );
+            p_sys->i_sequence_number = i_sequence_number;
+            p_sys->i_last_pcr = i_pcr;
+            return NULL;
+        }
+        rtp_ChainInsert( p_access, &p_sys->p_list, &p_sys->p_end, p_block );
+        return rtp_ChainSend( p_access, &p_sys->p_list, i_sequence_expected );
+    }
 
-trash:
-    msg_Warn( p_access, "received a too short packet for RTP" );
-    block_Release( p_block );
-    return NULL;
+    /* This is the normal case when no packet reordering is effective */
+    p_sys->i_sequence_number = i_sequence_number;
+    p_sys->i_last_pcr = i_pcr;
+
+    /* Return the packet without the RTP header. */
+    p_block->i_buffer -= i_skip;
+    p_block->p_buffer += i_skip;
+    return p_block;
 }
 
 static block_t *BlockRTP( access_t *p_access )
@@ -439,7 +740,7 @@ static block_t *BlockChoose( access_t *p_access )
         return p_block;
 
     /* Parse the header and make some verifications.
-     * See RFC 1889 & RFC 2250. */
+     * See RFC 3550. */
 
     i_rtp_version  = ( p_block->p_buffer[0] & 0xC0 ) >> 6;
     i_CSRC_count   = ( p_block->p_buffer[0] & 0x0F );
index f25d714b8b327d072ac0e7ba4037ae0792a4e256..cb9bfbc5bbf595b8941db68e72554a7d2b494f62 100644 (file)
@@ -198,7 +198,7 @@ stream_t *__stream_UrlNew( vlc_object_t *p_parent, const char *psz_url )
 
     psz_dup = strdup( psz_url );
     MRLSplit( p_parent, psz_dup, &psz_access, &psz_demux, &psz_path );
-    
+
     /* Now try a real access */
     p_access = access2_New( p_parent, psz_access, psz_demux, psz_path, 0 );
     free( psz_dup );
@@ -612,12 +612,16 @@ static void AStreamPrebufferBlock( stream_t *s )
             msg_Dbg( s, "received first data for our buffer");
         }
 
-        /* Append the block */
-        p_sys->block.i_size += b->i_buffer;
-        *p_sys->block.pp_last = b;
-        p_sys->block.pp_last = &b->p_next;
+        while( b )
+        {
+            /* Append the block */
+            p_sys->block.i_size += b->i_buffer;
+            *p_sys->block.pp_last = b;
+            p_sys->block.pp_last = &b->p_next;
 
-        p_sys->stat.i_read_count++;
+            p_sys->stat.i_read_count++;
+            b = b->p_next;
+        }
     }
 
     p_sys->block.p_current = p_sys->block.p_first;
@@ -926,22 +930,28 @@ static int AStreamRefillBlock( stream_t *s )
 
         msleep( STREAM_DATA_WAIT );
     }
-    i_stop = mdate();
 
-    /* Append the block */
-    p_sys->block.i_size += b->i_buffer;
-    *p_sys->block.pp_last = b;
-    p_sys->block.pp_last = &b->p_next;
+    while( b )
+    {
+        i_stop = mdate();
 
-    /* Fix p_current */
-    if( p_sys->block.p_current == NULL )
-        p_sys->block.p_current = b;
+        /* Append the block */
+        p_sys->block.i_size += b->i_buffer;
+        *p_sys->block.pp_last = b;
+        p_sys->block.pp_last = &b->p_next;
 
-    /* Update stat */
-    p_sys->stat.i_bytes += b->i_buffer;
-    p_sys->stat.i_read_time += i_stop - i_start;
-    p_sys->stat.i_read_count++;
+        /* Fix p_current */
+        if( p_sys->block.p_current == NULL )
+            p_sys->block.p_current = b;
 
+        /* Update stat */
+        p_sys->stat.i_bytes += b->i_buffer;
+        p_sys->stat.i_read_time += i_stop - i_start;
+        p_sys->stat.i_read_count++;
+
+        b = b->p_next;
+        i_start = mdate();
+    }
     return VLC_SUCCESS;
 }