From 5df2e973d079c0a43572cb9b248d226911b4cfbc Mon Sep 17 00:00:00 2001 From: Pierre d'Herbemont Date: Fri, 23 May 2008 13:28:36 +0200 Subject: [PATCH] stream: Add a new method for buffering access: A*Immediate method. It is much more efficient regarding latency as it doesn't bufferize more than needed, and let the module access take care of that eventually. Enable with --use-stream-immediate. We may want to default it. Note: --use-stream-immediate will be only effective on access that don't provide pf_block() for now. This is because I didn't benchmark against the Block method. Here the gain that I did measure is about 200ms (less latency) when using the http access on a loopback. --- src/input/stream.c | 226 +++++++++++++++++++++++++++++++++++++++++--- src/libvlc-module.c | 9 ++ 2 files changed, 224 insertions(+), 11 deletions(-) diff --git a/src/input/stream.c b/src/input/stream.c index 31caeb8239..0c60db9e77 100644 --- a/src/input/stream.c +++ b/src/input/stream.c @@ -27,15 +27,18 @@ #include +#include + #include "input_internal.h" #undef STREAM_DEBUG /* TODO: - * - tune the 2 methods + * - tune the 2 methods (block/stream) * - compute cost for seek * - improve stream mode seeking with closest segments * - ... + * - Maybe remove (block/stream) in favour of immediate */ /* Two methods: @@ -43,6 +46,9 @@ * One linked list of data read * - using pf_read * More complex scheme using mutliple track to avoid seeking + * - using directly the access (only indirection for peeking). + * This method is known to introduce much less latency. + * It should probably defaulted (instead of the stream method (2)). */ /* How many tracks we have, currently only used for stream mode */ @@ -101,11 +107,18 @@ typedef struct } access_entry_t; +typedef enum stream_read_method_t +{ + Immediate, + Block, + Stream +} stream_read_method_t; + struct stream_sys_t { access_t *p_access; - bool b_block; /* Block method (1) or stream */ + stream_read_method_t method; /* method to use */ int64_t i_pos; /* Current reading offset */ @@ -138,6 +151,13 @@ struct stream_sys_t } stream; + /* Method 3: for pf_read */ + struct + { + int64_t i_end; + uint8_t *p_buffer; + } immediate; + /* Peek temporary buffer */ int i_peek; uint8_t *p_peek; @@ -182,12 +202,47 @@ static int AStreamSeekStream( stream_t *s, int64_t i_pos ); static void AStreamPrebufferStream( stream_t *s ); static int AReadStream( stream_t *s, void *p_read, int i_read ); +/* Method 3 */ +static int AStreamReadImmediate( stream_t *s, void *p_read, int i_read ); +static int AStreamPeekImmediate( stream_t *s, const uint8_t **pp_peek, int i_read ); +static int AStreamSeekImmediate( stream_t *s, int64_t i_pos ); + /* Common */ static int AStreamControl( stream_t *s, int i_query, va_list ); static void AStreamDestroy( stream_t *s ); static void UStreamDestroy( stream_t *s ); static int ASeek( stream_t *s, int64_t i_pos ); +/**************************************************************************** + * Method 3 helpers: + ****************************************************************************/ + +static inline int64_t stream_buffered_size( stream_t *s ) +{ + return s->p_sys->immediate.i_end; +} + +static inline void stream_buffer_empty( stream_t *s, int length ) +{ + length = __MAX( stream_buffered_size( s ), length ); + if( length ) + { + memmove( s->p_sys->immediate.p_buffer, + s->p_sys->immediate.p_buffer + length, + stream_buffered_size( s ) - length ); + } + s->p_sys->immediate.i_end -= length; +} + +static inline void stream_buffer_fill( stream_t *s, int length ) +{ + s->p_sys->immediate.i_end += length; +} + +static inline uint8_t * stream_buffer( stream_t *s ) +{ + return s->p_sys->immediate.p_buffer; +} /**************************************************************************** * stream_UrlNew: create a stream from a access @@ -254,7 +309,13 @@ stream_t *stream_AccessNew( access_t *p_access, bool b_quick ) /* Common field */ p_sys->p_access = p_access; - p_sys->b_block = p_access->pf_block ? true : false; + if( p_access->pf_block ) + p_sys->method = Block; + else if (var_CreateGetBool( s, "use-stream-immediate")) + p_sys->method = Immediate; + else + p_sys->method = Stream; + p_sys->i_pos = p_access->info.i_pos; /* Stats */ @@ -340,8 +401,9 @@ stream_t *stream_AccessNew( access_t *p_access, bool b_quick ) p_sys->i_peek = 0; p_sys->p_peek = NULL; - if( p_sys->b_block ) + if( p_sys->method == Block ) { + msg_Dbg( s, "Using AStream*Block" ); s->pf_read = AStreamReadBlock; s->pf_peek = AStreamPeekBlock; @@ -362,10 +424,33 @@ stream_t *stream_AccessNew( access_t *p_access, bool b_quick ) goto error; } } - else + else if (p_sys->method == Immediate) + { + msg_Dbg( s, "Using AStream*Immediate" ); + + s->pf_read = AStreamReadImmediate; + s->pf_peek = AStreamPeekImmediate; + + /* Allocate/Setup our tracks (useful to peek)*/ + p_sys->immediate.i_end = 0; + p_sys->immediate.p_buffer = malloc( STREAM_CACHE_SIZE ); + + msg_Dbg( s, "p_buffer %p-%p", p_sys->immediate.p_buffer, + p_sys->immediate.p_buffer + STREAM_CACHE_SIZE ); + + if( p_sys->immediate.p_buffer == NULL ) + { + msg_Err( s, "Out of memory when allocating stream cache (%d bytes)", + STREAM_CACHE_SIZE ); + goto error; + } + } + else /* ( p_sys->method == Stream ) */ { int i; + msg_Dbg( s, "Using AStream*Stream" ); + s->pf_read = AStreamReadStream; s->pf_peek = AStreamPeekStream; @@ -409,7 +494,7 @@ stream_t *stream_AccessNew( access_t *p_access, bool b_quick ) return s; error: - if( p_sys->b_block ) + if( p_sys->method == Block ) { /* Nothing yet */ } @@ -436,7 +521,7 @@ static void AStreamDestroy( stream_t *s ) vlc_object_detach( s ); - if( p_sys->b_block ) block_ChainRelease( p_sys->block.p_first ); + if( p_sys->method == Block ) block_ChainRelease( p_sys->block.p_first ); else free( p_sys->stream.p_buffer ); free( p_sys->p_peek ); @@ -473,7 +558,7 @@ void stream_AccessReset( stream_t *s ) p_sys->i_pos = p_sys->p_access->info.i_pos; - if( p_sys->b_block ) + if( p_sys->method == Block ) { block_ChainRelease( p_sys->block.p_first ); @@ -488,7 +573,11 @@ void stream_AccessReset( stream_t *s ) /* Do the prebuffering */ AStreamPrebufferBlock( s ); } - else + else if( p_sys->method == Immediate ) + { + stream_buffer_empty( s, stream_buffered_size( s ) ); + } + else /* ( p_sys->method == Stream ) */ { int i; @@ -572,9 +661,11 @@ static int AStreamControl( stream_t *s, int i_query, va_list args ) case STREAM_SET_POSITION: i_64 = (int64_t)va_arg( args, int64_t ); - if( p_sys->b_block ) + if( p_sys->method == Block ) return AStreamSeekBlock( s, i_64 ); - else + else if( p_sys->method == Immediate ) + return AStreamSeekImmediate( s, i_64 ); + else /* ( p_sys->method == Stream ) */ return AStreamSeekStream( s, i_64 ); case STREAM_GET_MTU: @@ -1420,6 +1511,119 @@ static void AStreamPrebufferStream( stream_t *s ) } } +/**************************************************************************** + * Method 3: + ****************************************************************************/ + +static int AStreamReadImmediate( stream_t *s, void *p_read, int i_read ) +{ + stream_sys_t *p_sys = s->p_sys; + +#ifdef STREAM_DEBUG + msg_Dbg( s, "AStreamReadImmediate p_read=%p i_read=%d", + p_read, i_read ); +#endif + + if( p_read == NULL ) + { + /* seek within this stream if possible, else use plain old read and discard */ + stream_sys_t *p_sys = s->p_sys; + access_t *p_access = p_sys->p_access; + bool b_aseek; + access_Control( p_access, ACCESS_CAN_SEEK, &b_aseek ); + if( b_aseek ) + return AStreamSeekStream( s, p_sys->i_pos + i_read ) ? 0 : i_read; + } + + /* First, check if we already have some data in the buffer, + * that we could copy directly */ + int i_copy = __MIN( stream_buffered_size( s ), i_read ); + if( i_copy ) + { +#ifdef STREAM_DEBUG + msg_Dbg( s, "AStreamReadImmediate: copy %d from %p", i_copy, stream_buffer( s ) ); +#endif + + assert( i_copy <= STREAM_CACHE_SIZE ); + + if( p_read ) + { + memcpy( p_read, stream_buffer( s ), i_copy ); + p_read = (uint8_t *)p_read + i_copy; + } + } + + /* Now that we've read our buffer we don't need its i_copy bytes */ + stream_buffer_empty( s, i_copy ); + + /* Now check if we have still to really read some data */ + int i_to_read = i_read - i_copy; + if( i_to_read ) + { + i_to_read = AReadStream( s, p_read, i_to_read ); + } + + p_sys->i_pos += i_to_read; + + return i_to_read + i_copy; +} + +static int AStreamPeekImmediate( stream_t *s, const uint8_t **pp_peek, int i_read ) +{ +#ifdef STREAM_DEBUG + msg_Dbg( s, "AStreamPeekImmediate: %d size=%"PRId64, + i_read, size_buffered_size( s ) ); +#endif + + /* Avoid problem, but that shouldn't happen */ + if( i_read > STREAM_CACHE_SIZE / 2 ) + i_read = STREAM_CACHE_SIZE / 2; + + int i_to_read = i_read - stream_buffered_size( s ); + if( i_to_read > 0 ) + { +#ifdef STREAM_DEBUG + msg_Dbg( s, "AStreamPeekImmediate: Reading %d", + i_to_read ); +#endif + i_to_read = AReadStream( s, stream_buffer( s ) + stream_buffered_size( s ), + i_to_read ); + + if( i_to_read > 0 ) + stream_buffer_fill( s, i_to_read ); + } + + *pp_peek = stream_buffer( s ); + + return __MIN(stream_buffered_size( s ), i_read); +} + +static int AStreamSeekImmediate( stream_t *s, int64_t i_pos ) +{ + stream_sys_t *p_sys = s->p_sys; + access_t *p_access = p_sys->p_access; + bool b_aseek; + +#ifdef STREAM_DEBUG + msg_Dbg( s, "AStreamSeekImmediate to %"PRId64" pos=%"PRId64 + i_pos, p_sys->i_pos ); +#endif + + access_Control( p_access, ACCESS_CAN_SEEK, &b_aseek ); + if( !b_aseek ) + { + /* We can't do nothing */ + msg_Dbg( s, "AStreamSeekImmediate: can't seek" ); + return VLC_EGENERIC; + } + + /* Just reset our buffer */ + stream_buffer_empty( s, stream_buffered_size( s ) ); + + if( ASeek( s, i_pos ) ) return VLC_EGENERIC; + + return VLC_SUCCESS; +} /**************************************************************************** * stream_ReadLine: diff --git a/src/libvlc-module.c b/src/libvlc-module.c index 9b6d3fd239..2373f6980a 100644 --- a/src/libvlc-module.c +++ b/src/libvlc-module.c @@ -993,6 +993,12 @@ static const char *const ppsz_clock_descriptions[] = #define MINIMIZE_THREADS_LONGTEXT N_( \ "This option minimizes the number of threads needed to run VLC.") +#define USE_STREAM_IMMEDIATE N_("(Experimental) Use the StreamImmediate " \ + "method and minimize the caching done at the access level.") +#define USE_STREAM_IMMEDIATE_LONGTEXT N_( \ + "This option is useful if you want to lower the latency when " \ + "reading a stream") + #define PLUGIN_PATH_TEXT N_("Modules search path") #define PLUGIN_PATH_LONGTEXT N_( \ "Additional path for VLC to look for its modules. You can add " \ @@ -1814,6 +1820,9 @@ vlc_module_begin(); MINIMIZE_THREADS_LONGTEXT, true ); change_need_restart(); + add_bool( "use-stream-immediate", false, NULL, + USE_STREAM_IMMEDIATE, USE_STREAM_IMMEDIATE_LONGTEXT, false ); + #if !defined(__APPLE__) && !defined(SYS_BEOS) && defined(LIBVLC_USE_PTHREAD) add_bool( "rt-priority", false, NULL, RT_PRIORITY_TEXT, RT_PRIORITY_LONGTEXT, true ); -- 2.39.2