From e63f18a0cac245aa779c3e76fc46ee9bb5bfd336 Mon Sep 17 00:00:00 2001 From: =?utf8?q?R=C3=A9mi=20Denis-Courmont?= Date: Sun, 9 Nov 2008 19:36:30 +0200 Subject: [PATCH] RTP: try to improve packet re-ordering --- modules/access/rtp/Modules.am | 5 +- modules/access/rtp/input.c | 208 +++++++++++++++++++++++++++ modules/access/rtp/rtp.c | 259 +++++++++++----------------------- modules/access/rtp/rtp.h | 22 ++- modules/access/rtp/session.c | 88 ++++++++++-- 5 files changed, 389 insertions(+), 193 deletions(-) create mode 100644 modules/access/rtp/input.c diff --git a/modules/access/rtp/Modules.am b/modules/access/rtp/Modules.am index e8150b7023..855ca56b46 100644 --- a/modules/access/rtp/Modules.am +++ b/modules/access/rtp/Modules.am @@ -3,7 +3,10 @@ if HAVE_LIBGCRYPT libvlc_LTLIBRARIES += \ librtp_plugin.la librtp_plugin_la_SOURCES = \ - rtp.c rtp.h session.c + rtp.c \ + rtp.h \ + input.c \ + session.c librtp_plugin_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/libs/srtp librtp_plugin_la_LIBADD = $(AM_LIBADD) \ $(top_builddir)/libs/srtp/libvlc_srtp.la diff --git a/modules/access/rtp/input.c b/modules/access/rtp/input.c new file mode 100644 index 0000000000..8992cd49f1 --- /dev/null +++ b/modules/access/rtp/input.c @@ -0,0 +1,208 @@ +/** + * @file input.c + * @brief RTP packet input + */ +/***************************************************************************** + * Copyright © 2008 Rémi Denis-Courmont + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2.0 + * of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include +#include +#include + +#include +#ifdef HAVE_POLL +# include +#endif + +#include "rtp.h" +#include + +/** + * Gets a datagram from the network. + * @param fd datagram file descriptor + * @return a block or NULL on fatal error (socket dead) + */ +static block_t *rtp_dgram_recv (int fd) +{ + block_t *block = block_Alloc (0xffff); + ssize_t len; + + do + { + struct pollfd ufd = { .fd = fd, .events = POLLIN, }; + + block_cleanup_push (block); + poll (&ufd, 1, -1); + len = read (fd, block->p_buffer, block->i_buffer); + vlc_cleanup_pop (); + + if ((len <= 0) && (ufd.revents & POLLHUP)) + { /* POLLHUP -> permanent (DCCP) socket error */ + block_Release (block); + return NULL; + } + } + while (len == -1); + + return block_Realloc (block, 0, len); +} + + +/** + * Gets a framed RTP packet. + * @param fd stream file descriptor + * @return a block or NULL in case of fatal error + */ +static block_t *rtp_stream_recv (int fd) +{ + ssize_t len = 0; + struct pollfd ufd = { .fd = fd, .events = POLLIN, }; + uint8_t hdr[2]; /* frame header */ + + /* Receives the RTP frame header */ + do + { + ssize_t val; + + poll (&ufd, 1, -1); + val = read (fd, hdr + len, 2 - len); + if (val <= 0) + return NULL; + len += val; + } + while (len < 2); + + block_t *block = block_Alloc (GetWBE (hdr)); + + /* Receives the RTP packet */ + for (ssize_t i = 0; i < len;) + { + ssize_t val; + + block_cleanup_push (block); + poll (&ufd, 1, -1); + val = read (fd, block->p_buffer + i, block->i_buffer - i); + vlc_cleanup_pop (); + + if (val <= 0) + { + block_Release (block); + return NULL; + } + i += val; + } + + return block; +} + + +static block_t *rtp_recv (demux_t *demux) +{ + demux_sys_t *p_sys = demux->p_sys; + + for (block_t *block;; block_Release (block)) + { + block = p_sys->framed_rtp + ? rtp_stream_recv (p_sys->fd) + : rtp_dgram_recv (p_sys->fd); + if (block == NULL) + { + msg_Err (demux, "RTP flow stopped"); + break; /* fatal error */ + } + + if (block->i_buffer < 2) + continue; + + /* FIXME */ + const uint8_t ptype = rtp_ptype (block); + if (ptype >= 72 && ptype <= 76) + continue; /* Muxed RTCP, ignore for now */ + + if (p_sys->srtp) + { + size_t len = block->i_buffer; + int canc, err; + + canc = vlc_savecancel (); + err = srtp_recv (p_sys->srtp, block->p_buffer, &len); + vlc_restorecancel (canc); + if (err) + { + msg_Dbg (demux, "SRTP authentication/decryption failed"); + continue; + } + block->i_buffer = len; + } + return block; /* success! */ + } + return NULL; +} + + +void *rtp_thread (void *data) +{ + demux_t *demux = data; + demux_sys_t *p_sys = demux->p_sys; + + for (;;) + { + block_t *block = rtp_recv (demux); + if (block == NULL) + break; /* fatal error: abort */ + + vlc_mutex_lock (&p_sys->lock); + + /* Autodetect payload type, _before_ rtp_queue() */ + if (p_sys->autodetect) + { + if (rtp_autodetect (demux, p_sys->session, block)) + { + block_Release (block); + continue; + } + p_sys->autodetect = false; + } + + rtp_queue (demux, p_sys->session, block); + vlc_cond_signal (&p_sys->wait); + vlc_mutex_unlock (&p_sys->lock); + } + /* TODO: return 0 from Demux */ + return NULL; +} + + +void rtp_process (demux_t *demux) +{ + demux_sys_t *p_sys = demux->p_sys; + mtime_t deadline; + + vlc_mutex_lock (&p_sys->lock); + if (rtp_dequeue (demux, p_sys->session, &deadline)) + /* Pace the demux thread */ + vlc_cond_timedwait (&p_sys->wait, &p_sys->lock, deadline); + else + vlc_cond_wait (&p_sys->wait, &p_sys->lock); + vlc_mutex_unlock (&p_sys->lock); +} diff --git a/modules/access/rtp/rtp.c b/modules/access/rtp/rtp.c index 86d3d96dac..5028aa101f 100644 --- a/modules/access/rtp/rtp.c +++ b/modules/access/rtp/rtp.c @@ -31,9 +31,6 @@ #include #include #include -#ifdef HAVE_POLL -# include -#endif #include #include @@ -225,6 +222,8 @@ static int Open (vlc_object_t *obj) return VLC_EGENERIC; } + vlc_mutex_init (&p_sys->lock); + vlc_cond_init (&p_sys->wait); p_sys->srtp = NULL; p_sys->fd = fd; p_sys->rtcp_fd = rtcp_fd; @@ -266,6 +265,10 @@ static int Open (vlc_object_t *obj) } } + if (vlc_clone (&p_sys->thread, rtp_thread, demux, + VLC_THREAD_PRIORITY_INPUT)) + goto error; + p_sys->thread_ready = true; return VLC_SUCCESS; error: @@ -282,6 +285,14 @@ static void Close (vlc_object_t *obj) demux_t *demux = (demux_t *)obj; demux_sys_t *p_sys = demux->p_sys; + if (p_sys->thread_ready) + { + vlc_cancel (p_sys->thread); + vlc_join (p_sys->thread, NULL); + } + vlc_cond_destroy (&p_sys->wait); + vlc_mutex_destroy (&p_sys->lock); + if (p_sys->srtp) srtp_destroy (p_sys->srtp); if (p_sys->session) @@ -366,81 +377,6 @@ static int Control (demux_t *demux, int i_query, va_list args) } -/** - * Checks if a file descriptor is hung up. - */ -static bool fd_dead (int fd) -{ - struct pollfd ufd = { .fd = fd, }; - - return (poll (&ufd, 1, 0) == 1) && (ufd.revents & POLLHUP); -} - - -/** - * Gets a datagram from the network, or NULL in case of fatal error. - */ -static block_t *rtp_dgram_recv (demux_t *demux, int fd) -{ - block_t *block = block_Alloc (0xffff); - ssize_t len; - - do - { - len = net_Read (VLC_OBJECT (demux), fd, NULL, - block->p_buffer, block->i_buffer, false); - if (((len <= 0) && fd_dead (fd)) - || !vlc_object_alive (demux)) - { - block_Release (block); - return NULL; - } - } - while (len == -1); - - return block_Realloc (block, 0, len); -} - -/** - * Gets a framed RTP packet, or NULL in case of fatal error. - */ -static block_t *rtp_stream_recv (demux_t *demux, int fd) -{ - ssize_t len = 0; - uint8_t hdr[2]; /* frame header */ - - /* Receives the RTP frame header */ - do - { - ssize_t val = net_Read (VLC_OBJECT (demux), fd, NULL, - hdr + len, 2 - len, false); - if (val <= 0) - return NULL; - len += val; - } - while (len < 2); - - block_t *block = block_Alloc (GetWBE (hdr)); - - /* Receives the RTP packet */ - for (ssize_t i = 0; i < len;) - { - ssize_t val; - - val = net_Read (VLC_OBJECT (demux), fd, NULL, - block->p_buffer + i, block->i_buffer - i, false); - if (val <= 0) - { - block_Release (block); - return NULL; - } - i += val; - } - - return block; -} - - /* * Generic packet handlers */ @@ -613,6 +549,75 @@ static void *ts_init (demux_t *demux) } +/* Not using SDP, we need to guess the payload format used */ +/* see http://www.iana.org/assignments/rtp-parameters */ +int rtp_autodetect (demux_t *demux, rtp_session_t *session, + const block_t *block) +{ + uint8_t ptype = rtp_ptype (block); + rtp_pt_t pt = { + .init = NULL, + .destroy = codec_destroy, + .decode = codec_decode, + .frequency = 0, + .number = ptype, + }; + + switch (ptype) + { + case 0: + msg_Dbg (demux, "detected G.711 mu-law"); + pt.init = pcmu_init; + pt.frequency = 8000; + break; + + case 8: + msg_Dbg (demux, "detected G.711 A-law"); + pt.init = pcma_init; + pt.frequency = 8000; + break; + + case 10: + msg_Dbg (demux, "detected stereo PCM"); + pt.init = l16s_init; + pt.frequency = 44100; + break; + + case 11: + msg_Dbg (demux, "detected mono PCM"); + pt.init = l16m_init; + pt.frequency = 44100; + break; + + case 14: + msg_Dbg (demux, "detected MPEG Audio"); + pt.init = mpa_init; + pt.decode = mpa_decode; + pt.frequency = 90000; + break; + + case 32: + msg_Dbg (demux, "detected MPEG Video"); + pt.init = mpv_init; + pt.decode = mpv_decode; + pt.frequency = 90000; + break; + + case 33: + msg_Dbg (demux, "detected MPEG2 TS"); + pt.init = ts_init; + pt.destroy = stream_destroy; + pt.decode = stream_decode; + pt.frequency = 90000; + break; + + default: + return -1; + } + rtp_add_type (demux, session, &pt); + return 0; +} + /* * Dynamic payload type handlers * Hmm, none implemented yet. @@ -623,102 +628,6 @@ static void *ts_init (demux_t *demux) */ static int Demux (demux_t *demux) { - demux_sys_t *p_sys = demux->p_sys; - block_t *block; - - block = p_sys->framed_rtp - ? rtp_stream_recv (demux, p_sys->fd) - : rtp_dgram_recv (demux, p_sys->fd); - if (!block) - return 0; - - if (block->i_buffer < 2) - goto drop; - - const uint8_t ptype = block->p_buffer[1] & 0x7F; - if (ptype >= 72 && ptype <= 76) - goto drop; /* Muxed RTCP, ignore for now */ - - if (p_sys->srtp) - { - size_t len = block->i_buffer; - if (srtp_recv (p_sys->srtp, block->p_buffer, &len)) - { - msg_Dbg (demux, "SRTP authentication/decryption failed"); - goto drop; - } - block->i_buffer = len; - } - - /* Not using SDP, we need to guess the payload format used */ - /* see http://www.iana.org/assignments/rtp-parameters */ - if (p_sys->autodetect) - { - rtp_pt_t pt = { - .init = NULL, - .destroy = codec_destroy, - .decode = codec_decode, - .frequency = 0, - .number = ptype, - }; - switch (ptype) - { - case 0: - msg_Dbg (demux, "detected G.711 mu-law"); - pt.init = pcmu_init; - pt.frequency = 8000; - break; - - case 8: - msg_Dbg (demux, "detected G.711 A-law"); - pt.init = pcma_init; - pt.frequency = 8000; - break; - - case 10: - msg_Dbg (demux, "detected stereo PCM"); - pt.init = l16s_init; - pt.frequency = 44100; - break; - - case 11: - msg_Dbg (demux, "detected mono PCM"); - pt.init = l16m_init; - pt.frequency = 44100; - break; - - case 14: - msg_Dbg (demux, "detected MPEG Audio"); - pt.init = mpa_init; - pt.decode = mpa_decode; - pt.frequency = 90000; - break; - - case 32: - msg_Dbg (demux, "detected MPEG Video"); - pt.init = mpv_init; - pt.decode = mpv_decode; - pt.frequency = 90000; - break; - - case 33: - msg_Dbg (demux, "detected MPEG2 TS"); - pt.init = ts_init; - pt.destroy = stream_destroy; - pt.decode = stream_decode; - pt.frequency = 90000; - break; - - default: - goto drop; - } - rtp_add_type (demux, p_sys->session, &pt); - p_sys->autodetect = false; - } - rtp_receive (demux, p_sys->session, block); - - return 1; -drop: - block_Release (block); + rtp_process (demux); return 1; } diff --git a/modules/access/rtp/rtp.h b/modules/access/rtp/rtp.h index c8daa66a07..c27c2895ab 100644 --- a/modules/access/rtp/rtp.h +++ b/modules/access/rtp/rtp.h @@ -20,8 +20,10 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ****************************************************************************/ -/* RTP payload format */ typedef struct rtp_pt_t rtp_pt_t; +typedef struct rtp_session_t rtp_session_t; + +/** @section RTP payload format */ struct rtp_pt_t { void *(*init) (demux_t *); @@ -30,14 +32,22 @@ struct rtp_pt_t uint32_t frequency; /* RTP clock rate (Hz) */ uint8_t number; }; +int rtp_autodetect (demux_t *, rtp_session_t *, const block_t *); -/* RTP session */ -typedef struct rtp_session_t rtp_session_t; +static inline uint8_t rtp_ptype (const block_t *block) +{ + return block->p_buffer[1] & 0x7F; +} + +/** @section RTP session */ rtp_session_t *rtp_session_create (demux_t *); void rtp_session_destroy (demux_t *, rtp_session_t *); -void rtp_receive (demux_t *, rtp_session_t *, block_t *); +void rtp_queue (demux_t *, rtp_session_t *, block_t *); +bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *); int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt); +void rtp_process (demux_t *demux); +void *rtp_thread (void *data); /* Global data */ struct demux_sys_t @@ -46,6 +56,10 @@ struct demux_sys_t struct srtp_session_t *srtp; int fd; int rtcp_fd; + vlc_thread_t thread; + vlc_mutex_t lock; + vlc_cond_t wait; + bool thread_ready; unsigned caching; unsigned timeout; diff --git a/modules/access/rtp/session.c b/modules/access/rtp/session.c index dd8eaff45e..9756ba84e8 100644 --- a/modules/access/rtp/session.c +++ b/modules/access/rtp/session.c @@ -192,11 +192,6 @@ rtp_source_destroy (demux_t *demux, const rtp_session_t *session, free (source); } -static inline uint8_t rtp_ptype (const block_t *block) -{ - return block->p_buffer[1] & 0x7F; -} - static inline uint16_t rtp_seq (const block_t *block) { assert (block->i_buffer >= 4); @@ -234,7 +229,7 @@ rtp_find_ptype (const rtp_session_t *session, rtp_source_t *source, * @param block RTP packet including the RTP header */ void -rtp_receive (demux_t *demux, rtp_session_t *session, block_t *block) +rtp_queue (demux_t *demux, rtp_session_t *session, block_t *block) { demux_sys_t *p_sys = demux->p_sys; @@ -368,7 +363,7 @@ rtp_receive (demux_t *demux, rtp_session_t *session, block_t *block) block->p_next = *pp; *pp = block; - rtp_decode (demux, session, src); + /*rtp_decode (demux, session, src);*/ return; drop: @@ -381,16 +376,22 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src) { block_t *block = src->blocks; - /* Buffer underflow? */ - if (!block || !block->p_next || !block->p_next->p_next) - return; - /* TODO: use time rather than packet counts for buffer measurement */ + assert (block); src->blocks = block->p_next; block->p_next = NULL; /* Discontinuity detection */ - if (((src->last_seq + 1) & 0xffff) != rtp_seq (block)) + uint16_t delta_seq = rtp_seq (block) - (src->last_seq + 1); + if (delta_seq != 0) + { + if (delta_seq >= 0x8000) + { /* Unrecoverable if later packets have already been dequeued */ + msg_Warn (demux, "ignoring late packet (sequence: %u)", + rtp_seq (block)); + goto drop; + } block->i_flags |= BLOCK_FLAG_DISCONTINUITY; + } src->last_seq = rtp_seq (block); /* Match the payload type */ @@ -408,7 +409,7 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src) * format, a single source MUST only use payloads of a chosen frequency. * Otherwise it would be impossible to compute consistent timestamps. */ /* FIXME: handle timestamp wrap properly */ - /* TODO: sync multiple sources sanely... */ + /* TODO: inter-medias/sessions sync (using RTCP-SR) */ const uint32_t timestamp = rtp_timestamp (block); block->i_pts = UINT64_C(1) * CLOCK_FREQ * timestamp / pt->frequency; @@ -437,3 +438,64 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src) drop: block_Release (block); } + + +bool rtp_dequeue (demux_t *demux, const rtp_session_t *session, + mtime_t *restrict deadlinep) +{ + mtime_t now = mdate (); + bool pending = false; + + for (unsigned i = 0, max = session->srcc; i < max; i++) + { + rtp_source_t *src = session->srcv[i]; + block_t *block; + + /* Because of IP packet delay variation (IPDV), we need to guesstimate + * how long to wait for a missing packet in the RTP sequence + * (see RFC3393 for background on IPDV). + * + * This situation occurs if a packet got lost, or if the network has + * re-ordered packets. Unfortunately, the MSL is 2 minutes, orders of + * magnitude too long for multimedia. We need a tradeoff. + * If we underestimated IPDV, we may have to discard valid but late + * packets. If we overestimate it, we will either cause too much + * delay, or worse, underflow our downstream buffers, as we wait for + * definitely a lost packets. + * + * The rest of the "de-jitter buffer" work is done by the interval + * LibVLC E/S-out clock synchronization. Here, we need to bother about + * re-ordering packets, as decoders can't cope with mis-ordered data. + */ + while (((block = src->blocks)) != NULL) + { +#if 0 + if (rtp_seq (block) == ((src->last_seq + 1) & 0xffff)) + { /* Next block ready, no need to wait */ + rtp_decode (demux, session, src); + continue; + } +#endif + /* Wait for 3 times the inter-arrival delay variance (about 99.7% + * match for random gaussian jitter). Additionnaly, we implicitly + * wait for misordering times the packetization time. + */ + mtime_t deadline = src->last_rx; + const rtp_pt_t *pt = rtp_find_ptype (session, src, block, NULL); + if (pt) + deadline += UINT64_C(3) * CLOCK_FREQ * src->jitter + / pt->frequency; + + if (now >= deadline) + { + rtp_decode (demux, session, src); + continue; + } + if (*deadlinep > deadline) + *deadlinep = deadline; + pending = true; /* packet pending in buffer */ + break; + } + } + return pending; +} -- 2.39.2