]> git.sesse.net Git - vlc/blob - modules/access_output/udp.c
Force the sending threads to wakeup at exit - closes #1292
[vlc] / modules / access_output / udp.c
1 /*****************************************************************************
2  * udp.c
3  *****************************************************************************
4  * Copyright (C) 2001-2007 the VideoLAN team
5  * $Id$
6  *
7  * Authors: Laurent Aimar <fenrir@via.ecp.fr>
8  *          Eric Petit <titer@videolan.org>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23  *****************************************************************************/
24
25 /*****************************************************************************
26  * Preamble
27  *****************************************************************************/
28 #include <vlc/vlc.h>
29
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <assert.h>
35
36 #include <vlc_sout.h>
37 #include <vlc_block.h>
38
39 #ifdef HAVE_UNISTD_H
40 #   include <unistd.h>
41 #endif
42
43 #ifdef WIN32
44 #   include <winsock2.h>
45 #   include <ws2tcpip.h>
46 #else
47 #   include <sys/socket.h>
48 #endif
49
50 #include <vlc_network.h>
51
52 #define MAX_EMPTY_BLOCKS 200
53
54 #if defined(WIN32) || defined(UNDER_CE)
55 # define WINSOCK_STRERROR_SIZE 20
56 static const char *winsock_strerror( char *buf )
57 {
58     snprintf( buf, WINSOCK_STRERROR_SIZE, "Winsock error %d",
59               WSAGetLastError( ) );
60     buf[WINSOCK_STRERROR_SIZE - 1] = '\0';
61     return buf;
62 }
63 #endif
64
65 /*****************************************************************************
66  * Module descriptor
67  *****************************************************************************/
68 static int  Open ( vlc_object_t * );
69 static void Close( vlc_object_t * );
70
71 #define SOUT_CFG_PREFIX "sout-udp-"
72
73 #define CACHING_TEXT N_("Caching value (ms)")
74 #define CACHING_LONGTEXT N_( \
75     "Default caching value for outbound UDP streams. This " \
76     "value should be set in milliseconds." )
77
78 #define GROUP_TEXT N_("Group packets")
79 #define GROUP_LONGTEXT N_("Packets can be sent one by one at the right time " \
80                           "or by groups. You can choose the number " \
81                           "of packets that will be sent at a time. It " \
82                           "helps reducing the scheduling load on " \
83                           "heavily-loaded systems." )
84 #define AUTO_MCAST_TEXT N_("Automatic multicast streaming")
85 #define AUTO_MCAST_LONGTEXT N_("Allocates an outbound multicast address " \
86                                "automatically.")
87
88 vlc_module_begin();
89     set_description( _("UDP stream output") );
90     set_shortname( "UDP" );
91     set_category( CAT_SOUT );
92     set_subcategory( SUBCAT_SOUT_ACO );
93     add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_PTS_DELAY / 1000, NULL, CACHING_TEXT, CACHING_LONGTEXT, VLC_TRUE );
94     add_integer( SOUT_CFG_PREFIX "group", 1, NULL, GROUP_TEXT, GROUP_LONGTEXT,
95                                  VLC_TRUE );
96     add_obsolete_integer( SOUT_CFG_PREFIX "late" );
97     add_obsolete_bool( SOUT_CFG_PREFIX "raw" );
98     add_bool( SOUT_CFG_PREFIX "auto-mcast", VLC_FALSE, NULL, AUTO_MCAST_TEXT,
99               AUTO_MCAST_LONGTEXT, VLC_TRUE );
100
101     set_capability( "sout access", 100 );
102     add_shortcut( "udp" );
103     set_callbacks( Open, Close );
104 vlc_module_end();
105
106 /*****************************************************************************
107  * Exported prototypes
108  *****************************************************************************/
109
110 static const char *const ppsz_sout_options[] = {
111     "auto-mcast",
112     "caching",
113     "group",
114     NULL
115 };
116
117 /* Options handled by the libvlc network core */
118 static const char *const ppsz_core_options[] = {
119     "dscp",
120     "ttl",
121     "miface",
122     "miface-addr",
123     NULL
124 };
125
126 static int  Write   ( sout_access_out_t *, block_t * );
127 static int  Seek    ( sout_access_out_t *, off_t  );
128
129 static void ThreadWrite( vlc_object_t * );
130 static block_t *NewUDPPacket( sout_access_out_t *, mtime_t );
131 static const char *MakeRandMulticast (int family, char *buf, size_t buflen);
132
133 typedef struct sout_access_thread_t
134 {
135     VLC_COMMON_MEMBERS
136
137     sout_instance_t *p_sout;
138
139     block_fifo_t *p_fifo;
140
141     int         i_handle;
142
143     int64_t     i_caching;
144     int         i_group;
145
146     block_fifo_t *p_empty_blocks;
147 } sout_access_thread_t;
148
149 struct sout_access_out_sys_t
150 {
151     int                 i_mtu;
152     vlc_bool_t          b_mtu_warning;
153
154     block_t             *p_buffer;
155
156     sout_access_thread_t *p_thread;
157
158 };
159
160 #define DEFAULT_PORT 1234
161 #define RTP_HEADER_LENGTH 12
162
163 /*****************************************************************************
164  * Open: open the file
165  *****************************************************************************/
166 static int Open( vlc_object_t *p_this )
167 {
168     sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
169     sout_access_out_sys_t   *p_sys;
170
171     char                *psz_dst_addr = NULL;
172     int                 i_dst_port;
173
174     int                 i_handle;
175
176     config_ChainParse( p_access, SOUT_CFG_PREFIX,
177                        ppsz_sout_options, p_access->p_cfg );
178     config_ChainParse( p_access, "",
179                        ppsz_core_options, p_access->p_cfg );
180
181     if (var_Create (p_access, "dst-port", VLC_VAR_INTEGER)
182      || var_Create (p_access, "src-port", VLC_VAR_INTEGER)
183      || var_Create (p_access, "dst-addr", VLC_VAR_STRING)
184      || var_Create (p_access, "src-addr", VLC_VAR_STRING))
185     {
186         return VLC_ENOMEM;
187     }
188
189     if( !( p_sys = calloc ( 1, sizeof( sout_access_out_sys_t ) ) ) )
190     {
191         msg_Err( p_access, "not enough memory" );
192         return VLC_ENOMEM;
193     }
194     p_access->p_sys = p_sys;
195
196     i_dst_port = DEFAULT_PORT;
197     if (var_GetBool (p_access, SOUT_CFG_PREFIX"auto-mcast"))
198     {
199         char buf[INET6_ADDRSTRLEN];
200         if (MakeRandMulticast (AF_INET, buf, sizeof (buf)) != NULL)
201             psz_dst_addr = strdup (buf);
202     }
203     else
204     {
205         char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
206
207         if (psz_parser[0] == '[')
208             psz_parser = strchr (psz_parser, ']');
209
210         psz_parser = strchr (psz_parser ?: psz_dst_addr, ':');
211         if (psz_parser != NULL)
212         {
213             *psz_parser++ = '\0';
214             i_dst_port = atoi (psz_parser);
215         }
216     }
217
218     p_sys->p_thread =
219         vlc_object_create( p_access, sizeof( sout_access_thread_t ) );
220     if( !p_sys->p_thread )
221     {
222         msg_Err( p_access, "out of memory" );
223         free (p_sys);
224         free (psz_dst_addr);
225         return VLC_ENOMEM;
226     }
227
228     vlc_object_attach( p_sys->p_thread, p_access );
229     p_sys->p_thread->p_sout = p_access->p_sout;
230     p_sys->p_thread->b_die  = 0;
231     p_sys->p_thread->b_error= 0;
232     p_sys->p_thread->p_fifo = block_FifoNew( p_access );
233     p_sys->p_thread->p_empty_blocks = block_FifoNew( p_access );
234
235     i_handle = net_ConnectDgram( p_this, psz_dst_addr, i_dst_port, -1,
236                                  IPPROTO_UDP );
237     free (psz_dst_addr);
238
239     if( i_handle == -1 )
240     {
241          msg_Err( p_access, "failed to create raw UDP socket" );
242          vlc_object_destroy (p_sys->p_thread);
243          free (p_sys);
244          return VLC_EGENERIC;
245     }
246     else
247     {
248         char addr[NI_MAXNUMERICHOST];
249         int port;
250
251         if (net_GetSockAddress (i_handle, addr, &port) == 0)
252         {
253             msg_Dbg (p_access, "source: %s port %d", addr, port);
254             var_SetString (p_access, "src-addr", addr);
255             var_SetInteger (p_access, "src-port", port);
256         }
257
258         if (net_GetPeerAddress (i_handle, addr, &port) == 0)
259         {
260             msg_Dbg (p_access, "destination: %s port %d", addr, port);
261             var_SetString (p_access, "dst-addr", addr);
262             var_SetInteger (p_access, "dst-port", port);
263         }
264     }
265     p_sys->p_thread->i_handle = i_handle;
266     shutdown( i_handle, SHUT_RD );
267
268     p_sys->p_thread->i_caching =
269         (int64_t)1000 * var_GetInteger( p_access, SOUT_CFG_PREFIX "caching");
270     p_sys->p_thread->i_group =
271         var_GetInteger( p_access, SOUT_CFG_PREFIX "group" );
272
273     p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" );
274     p_sys->p_buffer = NULL;
275
276     if( vlc_thread_create( p_sys->p_thread, "sout write thread", ThreadWrite,
277                            VLC_THREAD_PRIORITY_HIGHEST, VLC_FALSE ) )
278     {
279         msg_Err( p_access->p_sout, "cannot spawn sout access thread" );
280         net_Close (i_handle);
281         vlc_object_destroy( p_sys->p_thread );
282         free (p_sys);
283         return VLC_EGENERIC;
284     }
285
286     p_access->pf_write = Write;
287     p_access->pf_seek = Seek;
288
289     /* update p_sout->i_out_pace_nocontrol */
290     p_access->p_sout->i_out_pace_nocontrol++;
291
292     return VLC_SUCCESS;
293 }
294
295 /*****************************************************************************
296  * Close: close the target
297  *****************************************************************************/
298 static void Close( vlc_object_t * p_this )
299 {
300     sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
301     sout_access_out_sys_t *p_sys = p_access->p_sys;
302     int i;
303
304     vlc_object_kill( p_sys->p_thread );
305     block_FifoWake( p_sys->p_thread->p_fifo );
306
307     for( i = 0; i < 10; i++ )
308     {
309         block_t *p_dummy = block_New( p_access, p_sys->i_mtu );
310         p_dummy->i_dts = 0;
311         p_dummy->i_pts = 0;
312         p_dummy->i_length = 0;
313         memset( p_dummy->p_buffer, 0, p_dummy->i_buffer );
314         block_FifoPut( p_sys->p_thread->p_fifo, p_dummy );
315     }
316     vlc_thread_join( p_sys->p_thread );
317
318     block_FifoRelease( p_sys->p_thread->p_fifo );
319     block_FifoRelease( p_sys->p_thread->p_empty_blocks );
320
321     if( p_sys->p_buffer ) block_Release( p_sys->p_buffer );
322
323     net_Close( p_sys->p_thread->i_handle );
324
325     vlc_object_detach( p_sys->p_thread );
326     vlc_object_destroy( p_sys->p_thread );
327     /* update p_sout->i_out_pace_nocontrol */
328     p_access->p_sout->i_out_pace_nocontrol--;
329
330     msg_Dbg( p_access, "UDP access output closed" );
331     free( p_sys );
332 }
333
334 /*****************************************************************************
335  * Write: standard write on a file descriptor.
336  *****************************************************************************/
337 static int Write( sout_access_out_t *p_access, block_t *p_buffer )
338 {
339     sout_access_out_sys_t *p_sys = p_access->p_sys;
340     int i_len = 0;
341
342     while( p_buffer )
343     {
344         block_t *p_next;
345         int i_packets = 0;
346         mtime_t now = mdate();
347
348         if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_mtu )
349         {
350             msg_Warn( p_access, "packet size > MTU, you should probably "
351                       "increase the MTU" );
352             p_sys->b_mtu_warning = VLC_TRUE;
353         }
354
355         /* Check if there is enough space in the buffer */
356         if( p_sys->p_buffer &&
357             p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu )
358         {
359             if( p_sys->p_buffer->i_dts + p_sys->p_thread->i_caching < now )
360             {
361                 msg_Dbg( p_access, "late packet for UDP input (" I64Fd ")",
362                          now - p_sys->p_buffer->i_dts
363                           - p_sys->p_thread->i_caching );
364             }
365             block_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer );
366             p_sys->p_buffer = NULL;
367         }
368
369         i_len += p_buffer->i_buffer;
370         while( p_buffer->i_buffer )
371         {
372             int i_payload_size = p_sys->i_mtu;
373
374             int i_write = __MIN( p_buffer->i_buffer, i_payload_size );
375
376             i_packets++;
377
378             if( !p_sys->p_buffer )
379             {
380                 p_sys->p_buffer = NewUDPPacket( p_access, p_buffer->i_dts );
381                 if( !p_sys->p_buffer ) break;
382             }
383
384             memcpy( p_sys->p_buffer->p_buffer + p_sys->p_buffer->i_buffer,
385                     p_buffer->p_buffer, i_write );
386
387             p_sys->p_buffer->i_buffer += i_write;
388             p_buffer->p_buffer += i_write;
389             p_buffer->i_buffer -= i_write;
390             if ( p_buffer->i_flags & BLOCK_FLAG_CLOCK )
391             {
392                 if ( p_sys->p_buffer->i_flags & BLOCK_FLAG_CLOCK )
393                     msg_Warn( p_access, "putting two PCRs at once" );
394                 p_sys->p_buffer->i_flags |= BLOCK_FLAG_CLOCK;
395             }
396
397             if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 )
398             {
399                 /* Flush */
400                 if( p_sys->p_buffer->i_dts + p_sys->p_thread->i_caching < now )
401                 {
402                     msg_Dbg( p_access, "late packet for udp input (" I64Fd ")",
403                              mdate() - p_sys->p_buffer->i_dts
404                               - p_sys->p_thread->i_caching );
405                 }
406                 block_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer );
407                 p_sys->p_buffer = NULL;
408             }
409         }
410
411         p_next = p_buffer->p_next;
412         block_Release( p_buffer );
413         p_buffer = p_next;
414     }
415
416     return( p_sys->p_thread->b_error ? -1 : i_len );
417 }
418
419 /*****************************************************************************
420  * Seek: seek to a specific location in a file
421  *****************************************************************************/
422 static int Seek( sout_access_out_t *p_access, off_t i_pos )
423 {
424     msg_Err( p_access, "UDP sout access cannot seek" );
425     return -1;
426 }
427
428 /*****************************************************************************
429  * NewUDPPacket: allocate a new UDP packet of size p_sys->i_mtu
430  *****************************************************************************/
431 static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
432 {
433     sout_access_out_sys_t *p_sys = p_access->p_sys;
434     block_t *p_buffer;
435
436     while ( block_FifoCount( p_sys->p_thread->p_empty_blocks ) > MAX_EMPTY_BLOCKS )
437     {
438         p_buffer = block_FifoGet( p_sys->p_thread->p_empty_blocks );
439         block_Release( p_buffer );
440     }
441
442     if( block_FifoCount( p_sys->p_thread->p_empty_blocks ) == 0 )
443     {
444         p_buffer = block_New( p_access->p_sout, p_sys->i_mtu );
445     }
446     else
447     {
448         p_buffer = block_FifoGet(p_sys->p_thread->p_empty_blocks );
449         p_buffer->i_flags = 0;
450         p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu );
451     }
452
453     p_buffer->i_dts = i_dts;
454     p_buffer->i_buffer = 0;
455
456     return p_buffer;
457 }
458
459 /*****************************************************************************
460  * ThreadWrite: Write a packet on the network at the good time.
461  *****************************************************************************/
462 static void ThreadWrite( vlc_object_t *p_this )
463 {
464     sout_access_thread_t *p_thread = (sout_access_thread_t*)p_this;
465     mtime_t              i_date_last = -1;
466     mtime_t              i_to_send = p_thread->i_group;
467     int                  i_dropped_packets = 0;
468 #if defined(WIN32) || defined(UNDER_CE)
469     char strerror_buf[WINSOCK_STRERROR_SIZE];
470 # define strerror( x ) winsock_strerror( strerror_buf )
471 #endif
472
473     while( !p_thread->b_die )
474     {
475         block_t *p_pk;
476         mtime_t       i_date, i_sent;
477 #if 0
478         if( (i++ % 1000)==0 ) {
479           int i = 0;
480           int j = 0;
481           block_t *p_tmp = p_thread->p_empty_blocks->p_first;
482           while( p_tmp ) { p_tmp = p_tmp->p_next; i++;}
483           p_tmp = p_thread->p_fifo->p_first;
484           while( p_tmp ) { p_tmp = p_tmp->p_next; j++;}
485           msg_Dbg( p_thread, "fifo depth: %d/%d, empty blocks: %d/%d",
486                    p_thread->p_fifo->i_depth, j,p_thread->p_empty_blocks->i_depth,i );
487         }
488 #endif
489         p_pk = block_FifoGet( p_thread->p_fifo );
490         if( p_pk == NULL )
491             continue; /* forced wake-up */
492
493         i_date = p_thread->i_caching + p_pk->i_dts;
494         if( i_date_last > 0 )
495         {
496             if( i_date - i_date_last > 2000000 )
497             {
498                 if( !i_dropped_packets )
499                     msg_Dbg( p_thread, "mmh, hole ("I64Fd" > 2s) -> drop",
500                              i_date - i_date_last );
501
502                 block_FifoPut( p_thread->p_empty_blocks, p_pk );
503
504                 i_date_last = i_date;
505                 i_dropped_packets++;
506                 continue;
507             }
508             else if( i_date - i_date_last < -1000 )
509             {
510                 if( !i_dropped_packets )
511                     msg_Dbg( p_thread, "mmh, packets in the past ("I64Fd")",
512                              i_date_last - i_date );
513             }
514         }
515
516         i_to_send--;
517         if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
518         {
519             mwait( i_date );
520             i_to_send = p_thread->i_group;
521         }
522         ssize_t val = send( p_thread->i_handle, p_pk->p_buffer,
523                             p_pk->i_buffer, 0 );
524         if (val == -1)
525         {
526             msg_Warn( p_thread, "send error: %s", strerror(errno) );
527         }
528
529         if( i_dropped_packets )
530         {
531             msg_Dbg( p_thread, "dropped %i packets", i_dropped_packets );
532             i_dropped_packets = 0;
533         }
534
535 #if 1
536         i_sent = mdate();
537         if ( i_sent > i_date + 20000 )
538         {
539             msg_Dbg( p_thread, "packet has been sent too late (" I64Fd ")",
540                      i_sent - i_date );
541         }
542 #endif
543
544         block_FifoPut( p_thread->p_empty_blocks, p_pk );
545
546         i_date_last = i_date;
547     }
548 }
549
550
551 static const char *MakeRandMulticast (int family, char *buf, size_t buflen)
552 {
553     uint32_t rand = (getpid() & 0xffff)
554                   | (uint32_t)(((mdate () >> 10) & 0xffff) << 16);
555
556     switch (family)
557     {
558 #ifdef AF_INET6
559         case AF_INET6:
560         {
561             struct in6_addr addr;
562             memcpy (&addr, "\xff\x38\x00\x00" "\x00\x00\x00\x00"
563                            "\x00\x00\x00\x00", 12);
564             rand |= 0x80000000;
565             memcpy (addr.s6_addr + 12, &(uint32_t){ htonl (rand) }, 4);
566             return inet_ntop (family, &addr, buf, buflen);
567         }
568 #endif
569
570         case AF_INET:
571         {
572             struct in_addr addr;
573             addr.s_addr = htonl ((rand & 0xffffff) | 0xe8000000);
574             return inet_ntop (family, &addr, buf, buflen);
575         }
576     }
577 #ifdef EAFNOSUPPORT
578     errno = EAFNOSUPPORT;
579 #endif
580     return NULL;
581 }
582
583