1 /*****************************************************************************
3 *****************************************************************************
4 * Copyright (C) 2001-2005 the VideoLAN team
7 * Authors: Laurent Aimar <fenrir@via.ecp.fr>
8 * Eric Petit <titer@videolan.org>
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23 *****************************************************************************/
25 /*****************************************************************************
27 *****************************************************************************/
29 #include <sys/types.h>
37 #include <vlc_block.h>
44 # include <winsock2.h>
45 # include <ws2tcpip.h>
47 # define IN_MULTICAST(a) IN_CLASSD(a)
50 # include <sys/socket.h>
53 #include <vlc_network.h>
55 #define MAX_EMPTY_BLOCKS 200
57 #if defined(WIN32) || defined(UNDER_CE)
58 # define WINSOCK_STRERROR_SIZE 20
59 static const char *winsock_strerror( char *buf )
61 snprintf( buf, WINSOCK_STRERROR_SIZE, "Winsock error %d",
63 buf[WINSOCK_STRERROR_SIZE - 1] = '\0';
68 /*****************************************************************************
70 *****************************************************************************/
71 static int Open ( vlc_object_t * );
72 static void Close( vlc_object_t * );
74 #define SOUT_CFG_PREFIX "sout-udp-"
76 #define CACHING_TEXT N_("Caching value (ms)")
77 #define CACHING_LONGTEXT N_( \
78 "Default caching value for outbound UDP streams. This " \
79 "value should be set in milliseconds." )
81 #define GROUP_TEXT N_("Group packets")
82 #define GROUP_LONGTEXT N_("Packets can be sent one by one at the right time " \
83 "or by groups. You can choose the number " \
84 "of packets that will be sent at a time. It " \
85 "helps reducing the scheduling load on " \
86 "heavily-loaded systems." )
87 #define RAW_TEXT N_("Raw write")
88 #define RAW_LONGTEXT N_("Packets will be sent " \
89 "directly, without trying to fill the MTU (ie, " \
90 "without trying to make the biggest possible packets " \
91 "in order to improve streaming)." )
94 set_description( _("UDP stream output") );
95 set_shortname( "UDP" );
96 set_category( CAT_SOUT );
97 set_subcategory( SUBCAT_SOUT_ACO );
98 add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_PTS_DELAY / 1000, NULL, CACHING_TEXT, CACHING_LONGTEXT, VLC_TRUE );
99 add_integer( SOUT_CFG_PREFIX "group", 1, NULL, GROUP_TEXT, GROUP_LONGTEXT,
101 add_suppressed_integer( SOUT_CFG_PREFIX "late" );
102 add_bool( SOUT_CFG_PREFIX "raw", 0, NULL, RAW_TEXT, RAW_LONGTEXT,
105 set_capability( "sout access", 100 );
106 add_shortcut( "udp" );
107 add_shortcut( "rtp" ); // Will work only with ts muxer
108 set_callbacks( Open, Close );
111 /*****************************************************************************
112 * Exported prototypes
113 *****************************************************************************/
115 static const char *ppsz_sout_options[] = {
122 /* Options handled by the libvlc network core */
123 static const char *ppsz_core_options[] = {
131 static int Write ( sout_access_out_t *, block_t * );
132 static int WriteRaw( sout_access_out_t *, block_t * );
133 static int Seek ( sout_access_out_t *, off_t );
135 static void ThreadWrite( vlc_object_t * );
136 static block_t *NewUDPPacket( sout_access_out_t *, mtime_t );
138 typedef struct sout_access_thread_t
142 sout_instance_t *p_sout;
144 block_fifo_t *p_fifo;
151 block_fifo_t *p_empty_blocks;
153 } sout_access_thread_t;
155 struct sout_access_out_sys_t
157 int b_rtpts; // 1 if add rtp/ts header
158 uint16_t i_sequence_number;
165 sout_access_thread_t *p_thread;
167 vlc_bool_t b_mtu_warning;
170 #define DEFAULT_PORT 1234
171 #define RTP_HEADER_LENGTH 12
173 /*****************************************************************************
174 * Open: open the file
175 *****************************************************************************/
176 static int Open( vlc_object_t *p_this )
178 sout_access_out_t *p_access = (sout_access_out_t*)p_this;
179 sout_access_out_sys_t *p_sys;
189 config_ChainParse( p_access, SOUT_CFG_PREFIX,
190 ppsz_sout_options, p_access->p_cfg );
191 config_ChainParse( p_access, "",
192 ppsz_core_options, p_access->p_cfg );
194 if( !( p_sys = malloc( sizeof( sout_access_out_sys_t ) ) ) )
196 msg_Err( p_access, "not enough memory" );
199 memset( p_sys, 0, sizeof(sout_access_out_sys_t) );
200 p_access->p_sys = p_sys;
202 if( p_access->psz_access != NULL &&
203 !strcmp( p_access->psz_access, "rtp" ) )
212 psz_parser = strdup( p_access->psz_name );
214 psz_dst_addr = psz_parser;
217 if ( *psz_parser == '[' )
219 while( *psz_parser && *psz_parser != ']' )
224 while( *psz_parser && *psz_parser != ':' )
228 if( *psz_parser == ':' )
232 i_dst_port = atoi( psz_parser );
234 if( i_dst_port <= 0 )
236 i_dst_port = DEFAULT_PORT;
240 vlc_object_create( p_access, sizeof( sout_access_thread_t ) );
241 if( !p_sys->p_thread )
243 msg_Err( p_access, "out of memory" );
247 vlc_object_attach( p_sys->p_thread, p_access );
248 p_sys->p_thread->p_sout = p_access->p_sout;
249 p_sys->p_thread->b_die = 0;
250 p_sys->p_thread->b_error= 0;
251 p_sys->p_thread->p_fifo = block_FifoNew( p_access );
252 p_sys->p_thread->p_empty_blocks = block_FifoNew( p_access );
254 i_handle = net_ConnectUDP( p_this, psz_dst_addr, i_dst_port, -1 );
257 msg_Err( p_access, "failed to create UDP socket" );
261 p_sys->p_thread->i_handle = i_handle;
262 net_StopRecv( i_handle );
264 var_Get( p_access, SOUT_CFG_PREFIX "caching", &val );
265 p_sys->p_thread->i_caching = (int64_t)val.i_int * 1000;
267 var_Get( p_access, SOUT_CFG_PREFIX "group", &val );
268 p_sys->p_thread->i_group = val.i_int;
270 p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" );
272 if( vlc_thread_create( p_sys->p_thread, "sout write thread", ThreadWrite,
273 VLC_THREAD_PRIORITY_HIGHEST, VLC_FALSE ) )
275 msg_Err( p_access->p_sout, "cannot spawn sout access thread" );
276 vlc_object_destroy( p_sys->p_thread );
280 srand( (uint32_t)mdate());
281 p_sys->p_buffer = NULL;
282 p_sys->i_sequence_number = rand()&0xffff;
283 p_sys->i_ssrc = rand()&0xffffffff;
285 var_Get( p_access, SOUT_CFG_PREFIX "raw", &val );
286 if( val.b_bool ) p_access->pf_write = WriteRaw;
287 else p_access->pf_write = Write;
289 p_access->pf_seek = Seek;
291 msg_Dbg( p_access, "udp access output opened(%s:%d)",
292 psz_dst_addr, i_dst_port );
294 free( psz_dst_addr );
296 /* update p_sout->i_out_pace_nocontrol */
297 p_access->p_sout->i_out_pace_nocontrol++;
302 /*****************************************************************************
303 * Close: close the target
304 *****************************************************************************/
305 static void Close( vlc_object_t * p_this )
307 sout_access_out_t *p_access = (sout_access_out_t*)p_this;
308 sout_access_out_sys_t *p_sys = p_access->p_sys;
311 p_sys->p_thread->b_die = 1;
312 for( i = 0; i < 10; i++ )
314 block_t *p_dummy = block_New( p_access, p_sys->i_mtu );
317 p_dummy->i_length = 0;
318 memset( p_dummy->p_buffer, 0, p_dummy->i_buffer );
319 block_FifoPut( p_sys->p_thread->p_fifo, p_dummy );
321 vlc_thread_join( p_sys->p_thread );
323 block_FifoRelease( p_sys->p_thread->p_fifo );
324 block_FifoRelease( p_sys->p_thread->p_empty_blocks );
326 if( p_sys->p_buffer ) block_Release( p_sys->p_buffer );
328 net_Close( p_sys->p_thread->i_handle );
330 vlc_object_detach( p_sys->p_thread );
331 vlc_object_destroy( p_sys->p_thread );
332 /* update p_sout->i_out_pace_nocontrol */
333 p_access->p_sout->i_out_pace_nocontrol--;
335 msg_Dbg( p_access, "udp access output closed" );
339 /*****************************************************************************
340 * Write: standard write on a file descriptor.
341 *****************************************************************************/
342 static int Write( sout_access_out_t *p_access, block_t *p_buffer )
344 sout_access_out_sys_t *p_sys = p_access->p_sys;
351 if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_mtu )
353 msg_Warn( p_access, "packet size > MTU, you should probably "
354 "increase the MTU" );
355 p_sys->b_mtu_warning = VLC_TRUE;
358 /* Check if there is enough space in the buffer */
359 if( p_sys->p_buffer &&
360 p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu )
362 if( p_sys->p_buffer->i_dts + p_sys->p_thread->i_caching < mdate() )
364 msg_Dbg( p_access, "late packet for udp input (" I64Fd ")",
365 mdate() - p_sys->p_buffer->i_dts
366 - p_sys->p_thread->i_caching );
368 block_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer );
369 p_sys->p_buffer = NULL;
372 while( p_buffer->i_buffer )
374 int i_payload_size = p_sys->i_mtu;
376 i_payload_size -= RTP_HEADER_LENGTH;
378 int i_write = __MIN( p_buffer->i_buffer, i_payload_size );
382 if( !p_sys->p_buffer )
384 p_sys->p_buffer = NewUDPPacket( p_access, p_buffer->i_dts );
385 if( !p_sys->p_buffer ) break;
388 memcpy( p_sys->p_buffer->p_buffer + p_sys->p_buffer->i_buffer,
389 p_buffer->p_buffer, i_write );
391 p_sys->p_buffer->i_buffer += i_write;
392 p_buffer->p_buffer += i_write;
393 p_buffer->i_buffer -= i_write;
394 if ( p_buffer->i_flags & BLOCK_FLAG_CLOCK )
396 if ( p_sys->p_buffer->i_flags & BLOCK_FLAG_CLOCK )
397 msg_Warn( p_access, "putting two PCRs at once" );
398 p_sys->p_buffer->i_flags |= BLOCK_FLAG_CLOCK;
401 if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 )
404 if( p_sys->p_buffer->i_dts + p_sys->p_thread->i_caching
407 msg_Dbg( p_access, "late packet for udp input (" I64Fd ")",
408 mdate() - p_sys->p_buffer->i_dts
409 - p_sys->p_thread->i_caching );
411 block_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer );
412 p_sys->p_buffer = NULL;
416 p_next = p_buffer->p_next;
417 block_Release( p_buffer );
421 return( p_sys->p_thread->b_error ? -1 : 0 );
424 /*****************************************************************************
425 * WriteRaw: write p_buffer without trying to fill mtu
426 *****************************************************************************/
427 static int WriteRaw( sout_access_out_t *p_access, block_t *p_buffer )
429 sout_access_out_sys_t *p_sys = p_access->p_sys;
432 while ( p_sys->p_thread->p_empty_blocks->i_depth >= MAX_EMPTY_BLOCKS )
434 p_buf = block_FifoGet(p_sys->p_thread->p_empty_blocks);
435 block_Release( p_buf );
438 block_FifoPut( p_sys->p_thread->p_fifo, p_buffer );
440 return( p_sys->p_thread->b_error ? -1 : 0 );
443 /*****************************************************************************
444 * Seek: seek to a specific location in a file
445 *****************************************************************************/
446 static int Seek( sout_access_out_t *p_access, off_t i_pos )
448 msg_Err( p_access, "UDP sout access cannot seek" );
452 /*****************************************************************************
453 * NewUDPPacket: allocate a new UDP packet of size p_sys->i_mtu
454 *****************************************************************************/
455 static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
457 sout_access_out_sys_t *p_sys = p_access->p_sys;
460 while ( p_sys->p_thread->p_empty_blocks->i_depth > MAX_EMPTY_BLOCKS )
462 p_buffer = block_FifoGet( p_sys->p_thread->p_empty_blocks );
463 block_Release( p_buffer );
466 if( p_sys->p_thread->p_empty_blocks->i_depth == 0 )
468 p_buffer = block_New( p_access->p_sout, p_sys->i_mtu );
472 p_buffer = block_FifoGet(p_sys->p_thread->p_empty_blocks );
473 p_buffer->i_flags = 0;
474 p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu );
477 p_buffer->i_dts = i_dts;
478 p_buffer->i_buffer = 0;
482 mtime_t i_timestamp = p_buffer->i_dts * 9 / 100;
484 /* add rtp/ts header */
485 p_buffer->p_buffer[0] = 0x80;
486 p_buffer->p_buffer[1] = 0x21; // mpeg2-ts
488 p_buffer->p_buffer[2] = ( p_sys->i_sequence_number >> 8 )&0xff;
489 p_buffer->p_buffer[3] = p_sys->i_sequence_number&0xff;
490 p_sys->i_sequence_number++;
492 p_buffer->p_buffer[4] = ( i_timestamp >> 24 )&0xff;
493 p_buffer->p_buffer[5] = ( i_timestamp >> 16 )&0xff;
494 p_buffer->p_buffer[6] = ( i_timestamp >> 8 )&0xff;
495 p_buffer->p_buffer[7] = i_timestamp&0xff;
497 p_buffer->p_buffer[ 8] = ( p_sys->i_ssrc >> 24 )&0xff;
498 p_buffer->p_buffer[ 9] = ( p_sys->i_ssrc >> 16 )&0xff;
499 p_buffer->p_buffer[10] = ( p_sys->i_ssrc >> 8 )&0xff;
500 p_buffer->p_buffer[11] = p_sys->i_ssrc&0xff;
502 p_buffer->i_buffer = RTP_HEADER_LENGTH;
508 /*****************************************************************************
509 * ThreadWrite: Write a packet on the network at the good time.
510 *****************************************************************************/
511 static void ThreadWrite( vlc_object_t *p_this )
513 sout_access_thread_t *p_thread = (sout_access_thread_t*)p_this;
514 mtime_t i_date_last = -1;
515 mtime_t i_to_send = p_thread->i_group;
516 int i_dropped_packets = 0;
517 #if defined(WIN32) || defined(UNDER_CE)
518 char strerror_buf[WINSOCK_STRERROR_SIZE];
519 # define strerror( x ) winsock_strerror( strerror_buf )
522 while( !p_thread->b_die )
525 mtime_t i_date, i_sent;
527 if( (i++ % 1000)==0 ) {
530 block_t *p_tmp = p_thread->p_empty_blocks->p_first;
531 while( p_tmp ) { p_tmp = p_tmp->p_next; i++;}
532 p_tmp = p_thread->p_fifo->p_first;
533 while( p_tmp ) { p_tmp = p_tmp->p_next; j++;}
534 msg_Err( p_thread, "fifo depth: %d/%d, empty blocks: %d/%d",
535 p_thread->p_fifo->i_depth, j,p_thread->p_empty_blocks->i_depth,i );
538 p_pk = block_FifoGet( p_thread->p_fifo );
540 i_date = p_thread->i_caching + p_pk->i_dts;
541 if( i_date_last > 0 )
543 if( i_date - i_date_last > 2000000 )
545 if( !i_dropped_packets )
546 msg_Dbg( p_thread, "mmh, hole ("I64Fd" > 2s) -> drop",
547 i_date - i_date_last );
549 block_FifoPut( p_thread->p_empty_blocks, p_pk );
551 i_date_last = i_date;
555 else if( i_date - i_date_last < -1000 )
557 if( !i_dropped_packets )
558 msg_Dbg( p_thread, "mmh, packets in the past ("I64Fd")",
559 i_date_last - i_date );
564 if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
567 i_to_send = p_thread->i_group;
569 if( send( p_thread->i_handle, p_pk->p_buffer, p_pk->i_buffer, 0 )
572 msg_Warn( p_thread, "send error: %s", strerror(errno) );
575 if( i_dropped_packets )
577 msg_Dbg( p_thread, "dropped %i packets", i_dropped_packets );
578 i_dropped_packets = 0;
583 if ( i_sent > i_date + 20000 )
585 msg_Dbg( p_thread, "packet has been sent too late (" I64Fd ")",
590 block_FifoPut( p_thread->p_empty_blocks, p_pk );
592 i_date_last = i_date;