]> git.sesse.net Git - vlc/commitdiff
RTP: try to improve packet re-ordering
authorRémi Denis-Courmont <rdenis@simphalempin.com>
Sun, 9 Nov 2008 17:36:30 +0000 (19:36 +0200)
committerRémi Denis-Courmont <rdenis@simphalempin.com>
Sun, 9 Nov 2008 17:36:30 +0000 (19:36 +0200)
modules/access/rtp/Modules.am
modules/access/rtp/input.c [new file with mode: 0644]
modules/access/rtp/rtp.c
modules/access/rtp/rtp.h
modules/access/rtp/session.c

index e8150b70238da48a2daf6d3b163a8baa3f0a2f16..855ca56b46b1cb0e255bd73a58727ee79131c6cf 100644 (file)
@@ -3,7 +3,10 @@ if HAVE_LIBGCRYPT
 libvlc_LTLIBRARIES += \
        librtp_plugin.la
 librtp_plugin_la_SOURCES = \
-       rtp.c rtp.h session.c
+       rtp.c \
+       rtp.h \
+       input.c \
+       session.c
 librtp_plugin_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/libs/srtp
 librtp_plugin_la_LIBADD = $(AM_LIBADD) \
        $(top_builddir)/libs/srtp/libvlc_srtp.la
diff --git a/modules/access/rtp/input.c b/modules/access/rtp/input.c
new file mode 100644 (file)
index 0000000..8992cd4
--- /dev/null
@@ -0,0 +1,208 @@
+/**
+ * @file input.c
+ * @brief RTP packet input
+ */
+/*****************************************************************************
+ * Copyright © 2008 Rémi Denis-Courmont
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2.0
+ * of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ ****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <vlc_common.h>
+#include <vlc_demux.h>
+#include <vlc_block.h>
+#include <vlc_network.h>
+
+#include <unistd.h>
+#ifdef HAVE_POLL
+# include <poll.h>
+#endif
+
+#include "rtp.h"
+#include <srtp.h>
+
+/**
+ * Gets a datagram from the network.
+ * @param fd datagram file descriptor
+ * @return a block or NULL on fatal error (socket dead)
+ */
+static block_t *rtp_dgram_recv (int fd)
+{
+    block_t *block = block_Alloc (0xffff);
+    ssize_t len;
+
+    do
+    {
+        struct pollfd ufd = { .fd = fd, .events = POLLIN, };
+
+        block_cleanup_push (block);
+        poll (&ufd, 1, -1);
+        len = read (fd, block->p_buffer, block->i_buffer);
+        vlc_cleanup_pop ();
+
+        if ((len <= 0) && (ufd.revents & POLLHUP))
+        {   /* POLLHUP -> permanent (DCCP) socket error */
+            block_Release (block);
+            return NULL;
+        }
+    }
+    while (len == -1);
+
+    return block_Realloc (block, 0, len);
+}
+
+
+/**
+ * Gets a framed RTP packet.
+ * @param fd stream file descriptor
+ * @return a block or NULL in case of fatal error
+ */
+static block_t *rtp_stream_recv (int fd)
+{
+    ssize_t len = 0;
+    struct pollfd ufd = { .fd = fd, .events = POLLIN, };
+    uint8_t hdr[2]; /* frame header */
+
+    /* Receives the RTP frame header */
+    do
+    {
+        ssize_t val;
+
+        poll (&ufd, 1, -1);
+        val = read (fd, hdr + len, 2 - len);
+        if (val <= 0)
+            return NULL;
+        len += val;
+    }
+    while (len < 2);
+
+    block_t *block = block_Alloc (GetWBE (hdr));
+
+    /* Receives the RTP packet */
+    for (ssize_t i = 0; i < len;)
+    {
+        ssize_t val;
+
+        block_cleanup_push (block);
+        poll (&ufd, 1, -1);
+        val = read (fd, block->p_buffer + i, block->i_buffer - i);
+        vlc_cleanup_pop ();
+
+        if (val <= 0)
+        {
+            block_Release (block);
+            return NULL;
+        }
+        i += val;
+    }
+
+    return block;
+}
+
+
+static block_t *rtp_recv (demux_t *demux)
+{
+    demux_sys_t *p_sys = demux->p_sys;
+
+    for (block_t *block;; block_Release (block))
+    {
+        block = p_sys->framed_rtp
+                ? rtp_stream_recv (p_sys->fd)
+                : rtp_dgram_recv (p_sys->fd);
+        if (block == NULL)
+        {
+            msg_Err (demux, "RTP flow stopped");
+            break; /* fatal error */
+        }
+
+        if (block->i_buffer < 2)
+            continue;
+
+        /* FIXME */
+        const uint8_t ptype = rtp_ptype (block);
+        if (ptype >= 72 && ptype <= 76)
+            continue; /* Muxed RTCP, ignore for now */
+
+        if (p_sys->srtp)
+        {
+            size_t len = block->i_buffer;
+            int canc, err;
+
+            canc = vlc_savecancel ();
+            err = srtp_recv (p_sys->srtp, block->p_buffer, &len);
+            vlc_restorecancel (canc);
+            if (err)
+            {
+                msg_Dbg (demux, "SRTP authentication/decryption failed");
+                continue;
+            }
+            block->i_buffer = len;
+        }
+        return block; /* success! */
+    }
+    return NULL;
+}
+
+
+void *rtp_thread (void *data)
+{
+    demux_t *demux = data;
+    demux_sys_t *p_sys = demux->p_sys;
+
+    for (;;)
+    {
+        block_t *block = rtp_recv (demux);
+        if (block == NULL)
+            break; /* fatal error: abort */
+
+        vlc_mutex_lock (&p_sys->lock);
+
+        /* Autodetect payload type, _before_ rtp_queue() */
+        if (p_sys->autodetect)
+        {
+            if (rtp_autodetect (demux, p_sys->session, block))
+            {
+                block_Release (block);
+                continue;
+            }
+            p_sys->autodetect = false;
+        }
+
+        rtp_queue (demux, p_sys->session, block);
+        vlc_cond_signal (&p_sys->wait);
+        vlc_mutex_unlock (&p_sys->lock);
+    }
+    /* TODO: return 0 from Demux */
+    return NULL;
+}
+
+
+void rtp_process (demux_t *demux)
+{
+    demux_sys_t *p_sys = demux->p_sys;
+    mtime_t deadline;
+
+    vlc_mutex_lock (&p_sys->lock);
+    if (rtp_dequeue (demux, p_sys->session, &deadline))
+        /* Pace the demux thread */
+        vlc_cond_timedwait (&p_sys->wait, &p_sys->lock, deadline);
+    else
+        vlc_cond_wait (&p_sys->wait, &p_sys->lock);
+    vlc_mutex_unlock (&p_sys->lock);
+}
index 86d3d96dac38cdc50ccdc1224c8466825698c83b..5028aa101f38f37cb8309316e06cf89374099ec5 100644 (file)
@@ -31,9 +31,6 @@
 #include <vlc_demux.h>
 #include <vlc_aout.h>
 #include <vlc_network.h>
-#ifdef HAVE_POLL
-# include <poll.h>
-#endif
 #include <vlc_plugin.h>
 
 #include <vlc_codecs.h>
@@ -225,6 +222,8 @@ static int Open (vlc_object_t *obj)
         return VLC_EGENERIC;
     }
 
+    vlc_mutex_init (&p_sys->lock);
+    vlc_cond_init (&p_sys->wait);
     p_sys->srtp         = NULL;
     p_sys->fd           = fd;
     p_sys->rtcp_fd      = rtcp_fd;
@@ -266,6 +265,10 @@ static int Open (vlc_object_t *obj)
         }
     }
 
+    if (vlc_clone (&p_sys->thread, rtp_thread, demux,
+                   VLC_THREAD_PRIORITY_INPUT))
+        goto error;
+    p_sys->thread_ready = true;
     return VLC_SUCCESS;
 
 error:
@@ -282,6 +285,14 @@ static void Close (vlc_object_t *obj)
     demux_t *demux = (demux_t *)obj;
     demux_sys_t *p_sys = demux->p_sys;
 
+    if (p_sys->thread_ready)
+    {
+        vlc_cancel (p_sys->thread);
+        vlc_join (p_sys->thread, NULL);
+    }
+    vlc_cond_destroy (&p_sys->wait);
+    vlc_mutex_destroy (&p_sys->lock);
+
     if (p_sys->srtp)
         srtp_destroy (p_sys->srtp);
     if (p_sys->session)
@@ -366,81 +377,6 @@ static int Control (demux_t *demux, int i_query, va_list args)
 }
 
 
-/**
- * Checks if a file descriptor is hung up.
- */
-static bool fd_dead (int fd)
-{
-    struct pollfd ufd = { .fd = fd, };
-
-    return (poll (&ufd, 1, 0) == 1) && (ufd.revents & POLLHUP);
-}
-
-
-/**
- * Gets a datagram from the network, or NULL in case of fatal error.
- */
-static block_t *rtp_dgram_recv (demux_t *demux, int fd)
-{
-    block_t *block = block_Alloc (0xffff);
-    ssize_t len;
-
-    do
-    {
-        len = net_Read (VLC_OBJECT (demux), fd, NULL,
-                                block->p_buffer, block->i_buffer, false);
-        if (((len <= 0) && fd_dead (fd))
-         || !vlc_object_alive (demux))
-        {
-            block_Release (block);
-            return NULL;
-        }
-    }
-    while (len == -1);
-
-    return block_Realloc (block, 0, len);
-}
-
-/**
- * Gets a framed RTP packet, or NULL in case of fatal error.
- */
-static block_t *rtp_stream_recv (demux_t *demux, int fd)
-{
-    ssize_t len = 0;
-    uint8_t hdr[2]; /* frame header */
-
-    /* Receives the RTP frame header */
-    do
-    {
-        ssize_t val = net_Read (VLC_OBJECT (demux), fd, NULL,
-                                hdr + len, 2 - len, false);
-        if (val <= 0)
-            return NULL;
-        len += val;
-    }
-    while (len < 2);
-
-    block_t *block = block_Alloc (GetWBE (hdr));
-
-    /* Receives the RTP packet */
-    for (ssize_t i = 0; i < len;)
-    {
-        ssize_t val;
-
-        val = net_Read (VLC_OBJECT (demux), fd, NULL,
-                        block->p_buffer + i, block->i_buffer - i, false);
-        if (val <= 0)
-        {
-            block_Release (block);
-            return NULL;
-        }
-        i += val;
-    }
-
-    return block;
-}
-
-
 /*
  * Generic packet handlers
  */
@@ -613,6 +549,75 @@ static void *ts_init (demux_t *demux)
 }
 
 
+/* Not using SDP, we need to guess the payload format used */
+/* see http://www.iana.org/assignments/rtp-parameters */
+int rtp_autodetect (demux_t *demux, rtp_session_t *session,
+                    const block_t *block)
+{
+    uint8_t ptype = rtp_ptype (block);
+    rtp_pt_t pt = {
+        .init = NULL,
+        .destroy = codec_destroy,
+        .decode = codec_decode,
+        .frequency = 0,
+        .number = ptype,
+    };
+
+    switch (ptype)
+    {
+      case 0:
+        msg_Dbg (demux, "detected G.711 mu-law");
+        pt.init = pcmu_init;
+        pt.frequency = 8000;
+        break;
+
+      case 8:
+        msg_Dbg (demux, "detected G.711 A-law");
+        pt.init = pcma_init;
+        pt.frequency = 8000;
+        break;
+
+      case 10:
+        msg_Dbg (demux, "detected stereo PCM");
+        pt.init = l16s_init;
+        pt.frequency = 44100;
+        break;
+
+      case 11:
+        msg_Dbg (demux, "detected mono PCM");
+        pt.init = l16m_init;
+        pt.frequency = 44100;
+        break;
+
+      case 14:
+        msg_Dbg (demux, "detected MPEG Audio");
+        pt.init = mpa_init;
+        pt.decode = mpa_decode;
+        pt.frequency = 90000;
+        break;
+
+      case 32:
+        msg_Dbg (demux, "detected MPEG Video");
+        pt.init = mpv_init;
+        pt.decode = mpv_decode;
+        pt.frequency = 90000;
+        break;
+
+      case 33:
+        msg_Dbg (demux, "detected MPEG2 TS");
+        pt.init = ts_init;
+        pt.destroy = stream_destroy;
+        pt.decode = stream_decode;
+        pt.frequency = 90000;
+        break;
+
+      default:
+        return -1;
+    }
+    rtp_add_type (demux, session, &pt);
+    return 0;
+}
+
 /*
  * Dynamic payload type handlers
  * Hmm, none implemented yet.
@@ -623,102 +628,6 @@ static void *ts_init (demux_t *demux)
  */
 static int Demux (demux_t *demux)
 {
-    demux_sys_t *p_sys = demux->p_sys;
-    block_t     *block;
-
-    block = p_sys->framed_rtp
-        ? rtp_stream_recv (demux, p_sys->fd)
-        : rtp_dgram_recv (demux, p_sys->fd);
-    if (!block)
-        return 0;
-
-    if (block->i_buffer < 2)
-        goto drop;
-
-    const uint8_t ptype = block->p_buffer[1] & 0x7F;
-    if (ptype >= 72 && ptype <= 76)
-        goto drop; /* Muxed RTCP, ignore for now */
-
-    if (p_sys->srtp)
-    {
-        size_t len = block->i_buffer;
-        if (srtp_recv (p_sys->srtp, block->p_buffer, &len))
-        {
-            msg_Dbg (demux, "SRTP authentication/decryption failed");
-            goto drop;
-        }
-        block->i_buffer = len;
-    }
-
-    /* Not using SDP, we need to guess the payload format used */
-    /* see http://www.iana.org/assignments/rtp-parameters */
-    if (p_sys->autodetect)
-    {
-        rtp_pt_t pt = {
-            .init = NULL,
-            .destroy = codec_destroy,
-            .decode = codec_decode,
-            .frequency = 0,
-            .number = ptype,
-        };
-        switch (ptype)
-        {
-          case 0:
-            msg_Dbg (demux, "detected G.711 mu-law");
-            pt.init = pcmu_init;
-            pt.frequency = 8000;
-            break;
-
-          case 8:
-            msg_Dbg (demux, "detected G.711 A-law");
-            pt.init = pcma_init;
-            pt.frequency = 8000;
-            break;
-
-          case 10:
-            msg_Dbg (demux, "detected stereo PCM");
-            pt.init = l16s_init;
-            pt.frequency = 44100;
-            break;
-
-          case 11:
-            msg_Dbg (demux, "detected mono PCM");
-            pt.init = l16m_init;
-            pt.frequency = 44100;
-            break;
-
-          case 14:
-            msg_Dbg (demux, "detected MPEG Audio");
-            pt.init = mpa_init;
-            pt.decode = mpa_decode;
-            pt.frequency = 90000;
-            break;
-
-          case 32:
-            msg_Dbg (demux, "detected MPEG Video");
-            pt.init = mpv_init;
-            pt.decode = mpv_decode;
-            pt.frequency = 90000;
-            break;
-
-          case 33:
-            msg_Dbg (demux, "detected MPEG2 TS");
-            pt.init = ts_init;
-            pt.destroy = stream_destroy;
-            pt.decode = stream_decode;
-            pt.frequency = 90000;
-            break;
-
-          default:
-            goto drop;
-        }
-        rtp_add_type (demux, p_sys->session, &pt);
-        p_sys->autodetect = false;
-    }
-    rtp_receive (demux, p_sys->session, block);
-
-    return 1;
-drop:
-    block_Release (block);
+    rtp_process (demux);
     return 1;
 }
index c8daa66a07352f932136a81b54a113797efbd8d0..c27c2895ab7bd5f2fdb1cfeed4876b7444510fac 100644 (file)
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  ****************************************************************************/
 
-/* RTP payload format */
 typedef struct rtp_pt_t rtp_pt_t;
+typedef struct rtp_session_t rtp_session_t;
+
+/** @section RTP payload format */
 struct rtp_pt_t
 {
     void   *(*init) (demux_t *);
@@ -30,14 +32,22 @@ struct rtp_pt_t
     uint32_t  frequency; /* RTP clock rate (Hz) */
     uint8_t   number;
 };
+int rtp_autodetect (demux_t *, rtp_session_t *, const block_t *);
 
-/* RTP session */
-typedef struct rtp_session_t rtp_session_t;
+static inline uint8_t rtp_ptype (const block_t *block)
+{
+    return block->p_buffer[1] & 0x7F;
+}
+
+/** @section RTP session */
 rtp_session_t *rtp_session_create (demux_t *);
 void rtp_session_destroy (demux_t *, rtp_session_t *);
-void rtp_receive (demux_t *, rtp_session_t *, block_t *);
+void rtp_queue (demux_t *, rtp_session_t *, block_t *);
+bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *);
 int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt);
 
+void rtp_process (demux_t *demux);
+void *rtp_thread (void *data);
 
 /* Global data */
 struct demux_sys_t
@@ -46,6 +56,10 @@ struct demux_sys_t
     struct srtp_session_t *srtp;
     int           fd;
     int           rtcp_fd;
+    vlc_thread_t  thread;
+    vlc_mutex_t   lock;
+    vlc_cond_t    wait;
+    bool thread_ready;
 
     unsigned      caching;
     unsigned      timeout;
index dd8eaff45eee82fbe0c96bc404404994e3148b4e..9756ba84e8305b12bd295bd2d078d78c1c3c3961 100644 (file)
@@ -192,11 +192,6 @@ rtp_source_destroy (demux_t *demux, const rtp_session_t *session,
     free (source);
 }
 
-static inline uint8_t rtp_ptype (const block_t *block)
-{
-    return block->p_buffer[1] & 0x7F;
-}
-
 static inline uint16_t rtp_seq (const block_t *block)
 {
     assert (block->i_buffer >= 4);
@@ -234,7 +229,7 @@ rtp_find_ptype (const rtp_session_t *session, rtp_source_t *source,
  * @param block RTP packet including the RTP header
  */
 void
-rtp_receive (demux_t *demux, rtp_session_t *session, block_t *block)
+rtp_queue (demux_t *demux, rtp_session_t *session, block_t *block)
 {
     demux_sys_t *p_sys = demux->p_sys;
 
@@ -368,7 +363,7 @@ rtp_receive (demux_t *demux, rtp_session_t *session, block_t *block)
     block->p_next = *pp;
     *pp = block;
 
-    rtp_decode (demux, session, src);
+    /*rtp_decode (demux, session, src);*/
     return;
 
 drop:
@@ -381,16 +376,22 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src)
 {
     block_t *block = src->blocks;
 
-    /* Buffer underflow? */
-    if (!block || !block->p_next || !block->p_next->p_next)
-        return;
-    /* TODO: use time rather than packet counts for buffer measurement */
+    assert (block);
     src->blocks = block->p_next;
     block->p_next = NULL;
 
     /* Discontinuity detection */
-    if (((src->last_seq + 1) & 0xffff) != rtp_seq (block))
+    uint16_t delta_seq = rtp_seq (block) - (src->last_seq + 1);
+    if (delta_seq != 0)
+    {
+        if (delta_seq >= 0x8000)
+        {   /* Unrecoverable if later packets have already been dequeued */
+            msg_Warn (demux, "ignoring late packet (sequence: %u)",
+                      rtp_seq (block));
+            goto drop;
+        }
         block->i_flags |= BLOCK_FLAG_DISCONTINUITY;
+    }
     src->last_seq = rtp_seq (block);
 
     /* Match the payload type */
@@ -408,7 +409,7 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src)
      * format, a single source MUST only use payloads of a chosen frequency.
      * Otherwise it would be impossible to compute consistent timestamps. */
     /* FIXME: handle timestamp wrap properly */
-    /* TODO: sync multiple sources sanely... */
+    /* TODO: inter-medias/sessions sync (using RTCP-SR) */
     const uint32_t timestamp = rtp_timestamp (block);
     block->i_pts = UINT64_C(1) * CLOCK_FREQ * timestamp / pt->frequency;
 
@@ -437,3 +438,64 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src)
 drop:
     block_Release (block);
 }
+
+
+bool rtp_dequeue (demux_t *demux, const rtp_session_t *session,
+                  mtime_t *restrict deadlinep)
+{
+    mtime_t now = mdate ();
+    bool pending = false;
+
+    for (unsigned i = 0, max = session->srcc; i < max; i++)
+    {
+        rtp_source_t *src = session->srcv[i];
+        block_t *block;
+
+        /* Because of IP packet delay variation (IPDV), we need to guesstimate
+         * how long to wait for a missing packet in the RTP sequence
+         * (see RFC3393 for background on IPDV).
+         *
+         * This situation occurs if a packet got lost, or if the network has
+         * re-ordered packets. Unfortunately, the MSL is 2 minutes, orders of
+         * magnitude too long for multimedia. We need a tradeoff.
+         * If we underestimated IPDV, we may have to discard valid but late
+         * packets. If we overestimate it, we will either cause too much
+         * delay, or worse, underflow our downstream buffers, as we wait for
+         * definitely a lost packets.
+         *
+         * The rest of the "de-jitter buffer" work is done by the interval
+         * LibVLC E/S-out clock synchronization. Here, we need to bother about
+         * re-ordering packets, as decoders can't cope with mis-ordered data.
+         */
+        while (((block = src->blocks)) != NULL)
+        {
+#if 0
+            if (rtp_seq (block) == ((src->last_seq + 1) & 0xffff))
+            {   /* Next block ready, no need to wait */
+                rtp_decode (demux, session, src);
+                continue;
+            }
+#endif
+            /* Wait for 3 times the inter-arrival delay variance (about 99.7%
+             * match for random gaussian jitter). Additionnaly, we implicitly
+             * wait for misordering times the packetization time.
+             */
+            mtime_t deadline = src->last_rx;
+            const rtp_pt_t *pt = rtp_find_ptype (session, src, block, NULL);
+            if (pt)
+                deadline += UINT64_C(3) * CLOCK_FREQ * src->jitter
+                            / pt->frequency;
+
+            if (now >= deadline)
+            {
+                rtp_decode (demux, session, src);
+                continue;
+            }
+            if (*deadlinep > deadline)
+                *deadlinep = deadline;
+            pending = true; /* packet pending in buffer */
+            break;
+        }
+    }
+    return pending;
+}