]> git.sesse.net Git - vlc/blob - modules/access_output/udp.c
afb9bf16dc417ca5500eb41436169fabcf8e9b99
[vlc] / modules / access_output / udp.c
1 /*****************************************************************************
2  * udp.c
3  *****************************************************************************
4  * Copyright (C) 2001-2007 the VideoLAN team
5  * $Id$
6  *
7  * Authors: Laurent Aimar <fenrir@via.ecp.fr>
8  *          Eric Petit <titer@videolan.org>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23  *****************************************************************************/
24
25 /*****************************************************************************
26  * Preamble
27  *****************************************************************************/
28 #ifdef HAVE_CONFIG_H
29 # include "config.h"
30 #endif
31
32 #include <vlc_common.h>
33 #include <vlc_plugin.h>
34
35 #include <sys/types.h>
36 #include <fcntl.h>
37 #include <assert.h>
38
39 #include <vlc_sout.h>
40 #include <vlc_block.h>
41
42 #ifdef HAVE_UNISTD_H
43 #   include <unistd.h>
44 #endif
45
46 #ifdef WIN32
47 #   include <winsock2.h>
48 #   include <ws2tcpip.h>
49 #else
50 #   include <sys/socket.h>
51 #endif
52
53 #include <vlc_network.h>
54
55 #define MAX_EMPTY_BLOCKS 200
56
57 /*****************************************************************************
58  * Module descriptor
59  *****************************************************************************/
60 static int  Open ( vlc_object_t * );
61 static void Close( vlc_object_t * );
62
63 #define SOUT_CFG_PREFIX "sout-udp-"
64
65 #define CACHING_TEXT N_("Caching value (ms)")
66 #define CACHING_LONGTEXT N_( \
67     "Default caching value for outbound UDP streams. This " \
68     "value should be set in milliseconds." )
69
70 #define GROUP_TEXT N_("Group packets")
71 #define GROUP_LONGTEXT N_("Packets can be sent one by one at the right time " \
72                           "or by groups. You can choose the number " \
73                           "of packets that will be sent at a time. It " \
74                           "helps reducing the scheduling load on " \
75                           "heavily-loaded systems." )
76
77 vlc_module_begin ()
78     set_description( N_("UDP stream output") )
79     set_shortname( "UDP" )
80     set_category( CAT_SOUT )
81     set_subcategory( SUBCAT_SOUT_ACO )
82     add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_PTS_DELAY / 1000, NULL, CACHING_TEXT, CACHING_LONGTEXT, true )
83     add_integer( SOUT_CFG_PREFIX "group", 1, NULL, GROUP_TEXT, GROUP_LONGTEXT,
84                                  true )
85     add_obsolete_integer( SOUT_CFG_PREFIX "late" )
86     add_obsolete_bool( SOUT_CFG_PREFIX "raw" )
87
88     set_capability( "sout access", 0 )
89     add_shortcut( "udp" )
90     set_callbacks( Open, Close )
91 vlc_module_end ()
92
93 /*****************************************************************************
94  * Exported prototypes
95  *****************************************************************************/
96
97 static const char *const ppsz_sout_options[] = {
98     "caching",
99     "group",
100     NULL
101 };
102
103 /* Options handled by the libvlc network core */
104 static const char *const ppsz_core_options[] = {
105     "dscp",
106     "ttl",
107     "miface",
108     "miface-addr",
109     NULL
110 };
111
112 static ssize_t Write   ( sout_access_out_t *, block_t * );
113 static int  Seek    ( sout_access_out_t *, off_t  );
114 static int Control( sout_access_out_t *, int, va_list );
115
116 static void* ThreadWrite( void * );
117 static block_t *NewUDPPacket( sout_access_out_t *, mtime_t );
118
119 struct sout_access_out_sys_t
120 {
121     mtime_t       i_caching;
122     int           i_handle;
123     bool          b_mtu_warning;
124     size_t        i_mtu;
125
126     block_fifo_t *p_fifo;
127     block_fifo_t *p_empty_blocks;
128     block_t      *p_buffer;
129
130     vlc_thread_t  thread;
131 };
132
133 #define DEFAULT_PORT 1234
134
135 /*****************************************************************************
136  * Open: open the file
137  *****************************************************************************/
138 static int Open( vlc_object_t *p_this )
139 {
140     sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
141     sout_access_out_sys_t   *p_sys;
142
143     char                *psz_dst_addr = NULL;
144     int                 i_dst_port;
145
146     int                 i_handle;
147
148     config_ChainParse( p_access, SOUT_CFG_PREFIX,
149                        ppsz_sout_options, p_access->p_cfg );
150     config_ChainParse( p_access, "",
151                        ppsz_core_options, p_access->p_cfg );
152
153     if (var_Create (p_access, "dst-port", VLC_VAR_INTEGER)
154      || var_Create (p_access, "src-port", VLC_VAR_INTEGER)
155      || var_Create (p_access, "dst-addr", VLC_VAR_STRING)
156      || var_Create (p_access, "src-addr", VLC_VAR_STRING))
157     {
158         return VLC_ENOMEM;
159     }
160
161     if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
162         return VLC_ENOMEM;
163     p_access->p_sys = p_sys;
164
165     i_dst_port = DEFAULT_PORT;
166     char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
167     if( !psz_dst_addr )
168     {
169         free( p_sys );
170         return VLC_ENOMEM;
171     }
172
173     if (psz_parser[0] == '[')
174         psz_parser = strchr (psz_parser, ']');
175
176     psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
177     if (psz_parser != NULL)
178     {
179         *psz_parser++ = '\0';
180         i_dst_port = atoi (psz_parser);
181     }
182
183     i_handle = net_ConnectDgram( p_this, psz_dst_addr, i_dst_port, -1,
184                                  IPPROTO_UDP );
185     free (psz_dst_addr);
186
187     if( i_handle == -1 )
188     {
189          msg_Err( p_access, "failed to create raw UDP socket" );
190          free (p_sys);
191          return VLC_EGENERIC;
192     }
193     else
194     {
195         char addr[NI_MAXNUMERICHOST];
196         int port;
197
198         if (net_GetSockAddress (i_handle, addr, &port) == 0)
199         {
200             msg_Dbg (p_access, "source: %s port %d", addr, port);
201             var_SetString (p_access, "src-addr", addr);
202             var_SetInteger (p_access, "src-port", port);
203         }
204
205         if (net_GetPeerAddress (i_handle, addr, &port) == 0)
206         {
207             msg_Dbg (p_access, "destination: %s port %d", addr, port);
208             var_SetString (p_access, "dst-addr", addr);
209             var_SetInteger (p_access, "dst-port", port);
210         }
211     }
212     shutdown( i_handle, SHUT_RD );
213
214     p_sys->i_caching = UINT64_C(1000)
215                      * var_GetInteger( p_access, SOUT_CFG_PREFIX "caching");
216     p_sys->i_handle = i_handle;
217     p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" );
218     p_sys->b_mtu_warning = false;
219     p_sys->p_fifo = block_FifoNew();
220     p_sys->p_empty_blocks = block_FifoNew();
221     p_sys->p_buffer = NULL;
222
223     if( vlc_clone( &p_sys->thread, ThreadWrite, p_access,
224                            VLC_THREAD_PRIORITY_HIGHEST ) )
225     {
226         msg_Err( p_access, "cannot spawn sout access thread" );
227         block_FifoRelease( p_sys->p_fifo );
228         block_FifoRelease( p_sys->p_empty_blocks );
229         net_Close (i_handle);
230         free (p_sys);
231         return VLC_EGENERIC;
232     }
233
234     p_access->pf_write = Write;
235     p_access->pf_seek = Seek;
236     p_access->pf_control = Control;
237
238     return VLC_SUCCESS;
239 }
240
241 /*****************************************************************************
242  * Close: close the target
243  *****************************************************************************/
244 static void Close( vlc_object_t * p_this )
245 {
246     sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
247     sout_access_out_sys_t *p_sys = p_access->p_sys;
248
249     vlc_cancel( p_sys->thread );
250     vlc_join( p_sys->thread, NULL );
251     block_FifoRelease( p_sys->p_fifo );
252     block_FifoRelease( p_sys->p_empty_blocks );
253
254     if( p_sys->p_buffer ) block_Release( p_sys->p_buffer );
255
256     net_Close( p_sys->i_handle );
257     free( p_sys );
258 }
259
260 static int Control( sout_access_out_t *p_access, int i_query, va_list args )
261 {
262     (void)p_access;
263
264     switch( i_query )
265     {
266         case ACCESS_OUT_CONTROLS_PACE:
267             *va_arg( args, bool * ) = false;
268             break;
269
270         default:
271             return VLC_EGENERIC;
272     }
273     return VLC_SUCCESS;
274 }
275
276 /*****************************************************************************
277  * Write: standard write on a file descriptor.
278  *****************************************************************************/
279 static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
280 {
281     sout_access_out_sys_t *p_sys = p_access->p_sys;
282     int i_len = 0;
283
284     while( p_buffer )
285     {
286         block_t *p_next;
287         int i_packets = 0;
288         mtime_t now = mdate();
289
290         if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_mtu )
291         {
292             msg_Warn( p_access, "packet size > MTU, you should probably "
293                       "increase the MTU" );
294             p_sys->b_mtu_warning = true;
295         }
296
297         /* Check if there is enough space in the buffer */
298         if( p_sys->p_buffer &&
299             p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu )
300         {
301             if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
302             {
303                 msg_Dbg( p_access, "late packet for UDP input (%"PRId64 ")",
304                          now - p_sys->p_buffer->i_dts
305                           - p_sys->i_caching );
306             }
307             block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
308             p_sys->p_buffer = NULL;
309         }
310
311         i_len += p_buffer->i_buffer;
312         while( p_buffer->i_buffer )
313         {
314             size_t i_payload_size = p_sys->i_mtu;
315             size_t i_write = __MIN( p_buffer->i_buffer, i_payload_size );
316
317             i_packets++;
318
319             if( !p_sys->p_buffer )
320             {
321                 p_sys->p_buffer = NewUDPPacket( p_access, p_buffer->i_dts );
322                 if( !p_sys->p_buffer ) break;
323             }
324
325             memcpy( p_sys->p_buffer->p_buffer + p_sys->p_buffer->i_buffer,
326                     p_buffer->p_buffer, i_write );
327
328             p_sys->p_buffer->i_buffer += i_write;
329             p_buffer->p_buffer += i_write;
330             p_buffer->i_buffer -= i_write;
331             if ( p_buffer->i_flags & BLOCK_FLAG_CLOCK )
332             {
333                 if ( p_sys->p_buffer->i_flags & BLOCK_FLAG_CLOCK )
334                     msg_Warn( p_access, "putting two PCRs at once" );
335                 p_sys->p_buffer->i_flags |= BLOCK_FLAG_CLOCK;
336             }
337
338             if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 )
339             {
340                 /* Flush */
341                 if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
342                 {
343                     msg_Dbg( p_access, "late packet for udp input (%"PRId64 ")",
344                              mdate() - p_sys->p_buffer->i_dts
345                               - p_sys->i_caching );
346                 }
347                 block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
348                 p_sys->p_buffer = NULL;
349             }
350         }
351
352         p_next = p_buffer->p_next;
353         block_Release( p_buffer );
354         p_buffer = p_next;
355     }
356
357     return i_len;
358 }
359
360 /*****************************************************************************
361  * Seek: seek to a specific location in a file
362  *****************************************************************************/
363 static int Seek( sout_access_out_t *p_access, off_t i_pos )
364 {
365     (void) i_pos;
366     msg_Err( p_access, "UDP sout access cannot seek" );
367     return -1;
368 }
369
370 /*****************************************************************************
371  * NewUDPPacket: allocate a new UDP packet of size p_sys->i_mtu
372  *****************************************************************************/
373 static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
374 {
375     sout_access_out_sys_t *p_sys = p_access->p_sys;
376     block_t *p_buffer;
377
378     while ( block_FifoCount( p_sys->p_empty_blocks ) > MAX_EMPTY_BLOCKS )
379     {
380         p_buffer = block_FifoGet( p_sys->p_empty_blocks );
381         block_Release( p_buffer );
382     }
383
384     if( block_FifoCount( p_sys->p_empty_blocks ) == 0 )
385     {
386         p_buffer = block_Alloc( p_sys->i_mtu );
387     }
388     else
389     {
390         p_buffer = block_FifoGet(p_sys->p_empty_blocks );
391         p_buffer->i_flags = 0;
392         p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu );
393     }
394
395     p_buffer->i_dts = i_dts;
396     p_buffer->i_buffer = 0;
397
398     return p_buffer;
399 }
400
401 /*****************************************************************************
402  * ThreadWrite: Write a packet on the network at the good time.
403  *****************************************************************************/
404 static void* ThreadWrite( void *data )
405 {
406     sout_access_out_t *p_access = data;
407     sout_access_out_sys_t *p_sys = p_access->p_sys;
408     mtime_t i_date_last = -1;
409     const unsigned i_group = var_GetInteger( p_access,
410                                              SOUT_CFG_PREFIX "group" );
411     mtime_t i_to_send = i_group;
412     unsigned i_dropped_packets = 0;
413
414     for (;;)
415     {
416         block_t *p_pk = block_FifoGet( p_sys->p_fifo );
417         mtime_t       i_date, i_sent;
418
419         i_date = p_sys->i_caching + p_pk->i_dts;
420         if( i_date_last > 0 )
421         {
422             if( i_date - i_date_last > 2000000 )
423             {
424                 if( !i_dropped_packets )
425                     msg_Dbg( p_access, "mmh, hole (%"PRId64" > 2s) -> drop",
426                              i_date - i_date_last );
427
428                 block_FifoPut( p_sys->p_empty_blocks, p_pk );
429
430                 i_date_last = i_date;
431                 i_dropped_packets++;
432                 continue;
433             }
434             else if( i_date - i_date_last < -1000 )
435             {
436                 if( !i_dropped_packets )
437                     msg_Dbg( p_access, "mmh, packets in the past (%"PRId64")",
438                              i_date_last - i_date );
439             }
440         }
441
442         block_cleanup_push( p_pk );
443         i_to_send--;
444         if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
445         {
446             mwait( i_date );
447             i_to_send = i_group;
448         }
449         if ( send( p_sys->i_handle, p_pk->p_buffer, p_pk->i_buffer, 0 ) == -1 )
450             msg_Warn( p_access, "send error: %m" );
451         vlc_cleanup_pop();
452
453         if( i_dropped_packets )
454         {
455             msg_Dbg( p_access, "dropped %i packets", i_dropped_packets );
456             i_dropped_packets = 0;
457         }
458
459 #if 1
460         i_sent = mdate();
461         if ( i_sent > i_date + 20000 )
462         {
463             msg_Dbg( p_access, "packet has been sent too late (%"PRId64 ")",
464                      i_sent - i_date );
465         }
466 #endif
467
468         block_FifoPut( p_sys->p_empty_blocks, p_pk );
469
470         i_date_last = i_date;
471     }
472     return NULL;
473 }