]> git.sesse.net Git - vlc/blob - modules/access_output/udp.c
* src/misc/threads.c: Implementation of real-time priorities for UNIX-like
[vlc] / modules / access_output / udp.c
1 /*****************************************************************************
2  * udp.c
3  *****************************************************************************
4  * Copyright (C) 2001, 2002 VideoLAN
5  * $Id: udp.c,v 1.15 2003/11/07 19:30:28 massiot Exp $
6  *
7  * Authors: Laurent Aimar <fenrir@via.ecp.fr>
8  *          Eric Petit <titer@videolan.org>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111, USA.
23  *****************************************************************************/
24
25 /*****************************************************************************
26  * Preamble
27  *****************************************************************************/
28 #include <stdlib.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <fcntl.h>
34
35 #include <vlc/vlc.h>
36 #include <vlc/input.h>
37 #include <vlc/sout.h>
38
39 #ifdef HAVE_UNISTD_H
40 #   include <unistd.h>
41 #endif
42
43 #ifdef WIN32
44 #   include <winsock2.h>
45 #   include <ws2tcpip.h>
46 #   ifndef IN_MULTICAST
47 #       define IN_MULTICAST(a) IN_CLASSD(a)
48 #   endif
49 #else
50 #   include <sys/socket.h>
51 #endif
52
53 #include "network.h"
54
55 #define DEFAULT_PORT 1234
56 #define LATENCY     100000
57 #define MAX_ERROR    500000
58 /*****************************************************************************
59  * Exported prototypes
60  *****************************************************************************/
61 static int     Open   ( vlc_object_t * );
62 static void    Close  ( vlc_object_t * );
63
64 static int     Write   ( sout_access_out_t *, sout_buffer_t * );
65 static int     WriteRaw( sout_access_out_t *, sout_buffer_t * );
66 static int     Seek    ( sout_access_out_t *, off_t  );
67
68 static void    ThreadWrite( vlc_object_t * );
69
70 static sout_buffer_t *NewUDPPacket( sout_access_out_t *, mtime_t );
71
72 /*****************************************************************************
73  * Module descriptor
74  *****************************************************************************/
75 #define CACHING_TEXT N_("caching value in ms")
76 #define CACHING_LONGTEXT N_( \
77     "Allows you to modify the default caching value for udp streams. This " \
78     "value should be set in miliseconds units." )
79
80 vlc_module_begin();
81     set_description( _("UDP stream ouput") );
82     add_category_hint( N_("udp stream output"), NULL , VLC_TRUE );
83     add_integer( "udp-sout-caching", DEFAULT_PTS_DELAY / 1000, NULL, CACHING_TEXT, CACHING_LONGTEXT, VLC_TRUE );
84     set_capability( "sout access", 100 );
85     add_shortcut( "udp" );
86     add_shortcut( "rtp" ); // Will work only with ts muxer
87     set_callbacks( Open, Close );
88 vlc_module_end();
89
90 typedef struct sout_access_thread_s
91 {
92     VLC_COMMON_MEMBERS
93
94     sout_instance_t *p_sout;
95
96     sout_fifo_t *p_fifo;
97
98     int         i_handle;
99
100     int64_t     i_caching;
101
102 } sout_access_thread_t;
103
104 struct sout_access_out_sys_t
105 {
106     int                 b_rtpts;  // 1 if add rtp/ts header
107     uint16_t            i_sequence_number;
108     uint32_t            i_ssrc;
109
110     unsigned int        i_mtu;
111
112     sout_buffer_t       *p_buffer;
113
114     sout_access_thread_t *p_thread;
115
116 };
117
118 /*****************************************************************************
119  * Open: open the file
120  *****************************************************************************/
121 static int Open( vlc_object_t *p_this )
122 {
123     sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
124     sout_access_out_sys_t   *p_sys;
125
126     char                *psz_parser;
127     char                *psz_dst_addr;
128     int                 i_dst_port;
129
130     module_t            *p_network;
131     network_socket_t    socket_desc;
132
133     char                *val;
134
135     if( !( p_sys = p_access->p_sys =
136                 malloc( sizeof( sout_access_out_sys_t ) ) ) )
137     {
138         msg_Err( p_access, "Not enough memory" );
139         return( VLC_EGENERIC );
140     }
141
142
143     if( p_access->psz_access != NULL &&
144         !strcmp( p_access->psz_access, "rtp" ) )
145     {
146         msg_Warn( p_access, "becarefull that rtp ouput work only with ts "
147                   "payload(not an error)" );
148         p_sys->b_rtpts = 1;
149     }
150     else
151     {
152         p_sys->b_rtpts = 0;
153     }
154
155     psz_parser = strdup( p_access->psz_name );
156
157     psz_dst_addr = psz_parser;
158     i_dst_port = 0;
159
160     if ( *psz_parser == '[' )
161     {
162         while( *psz_parser && *psz_parser != ']' )
163         {
164             psz_parser++;
165         }
166     }
167     while( *psz_parser && *psz_parser != ':' )
168     {
169         psz_parser++;
170     }
171     if( *psz_parser == ':' )
172     {
173         *psz_parser = '\0';
174         psz_parser++;
175         i_dst_port = atoi( psz_parser );
176     }
177     if( i_dst_port <= 0 )
178     {
179         i_dst_port = DEFAULT_PORT;
180     }
181
182     p_sys->p_thread =
183         vlc_object_create( p_access, sizeof( sout_access_thread_t ) );
184     if( !p_sys->p_thread )
185     {
186         msg_Err( p_access, "out of memory" );
187         return( VLC_EGENERIC );
188     }
189
190     p_sys->p_thread->p_sout = p_access->p_sout;
191     p_sys->p_thread->b_die  = 0;
192     p_sys->p_thread->b_error= 0;
193     p_sys->p_thread->p_fifo = sout_FifoCreate( p_access->p_sout );
194
195     socket_desc.i_type = NETWORK_UDP;
196     socket_desc.psz_server_addr = psz_dst_addr;
197     socket_desc.i_server_port   = i_dst_port;
198     socket_desc.psz_bind_addr   = "";
199     socket_desc.i_bind_port     = 0;
200     socket_desc.i_ttl           = 0;
201     if( ( val = sout_cfg_find_value( p_access->p_cfg, "ttl" ) ) )
202     {
203         socket_desc.i_ttl = atoi( val );
204     }
205     p_sys->p_thread->p_private = (void*)&socket_desc;
206     if( !( p_network = module_Need( p_sys->p_thread,
207                                     "network", "" ) ) )
208     {
209         msg_Err( p_access, "failed to open a connection (udp)" );
210         return( VLC_EGENERIC );
211     }
212     module_Unneed( p_sys->p_thread, p_network );
213
214     p_sys->p_thread->i_handle = socket_desc.i_handle;
215     p_sys->p_thread->i_caching = config_GetInt( p_this, "udp-sout-caching" ) * 1000;
216     if( ( val = sout_cfg_find_value( p_access->p_cfg, "caching" ) ) )
217     {
218         p_sys->p_thread->i_caching = atoll( val ) * 1000;
219     }
220
221     p_sys->i_mtu     = socket_desc.i_mtu;
222
223     if( vlc_thread_create( p_sys->p_thread, "sout write thread", ThreadWrite,
224                            VLC_THREAD_PRIORITY_OUTPUT, VLC_FALSE ) )
225     {
226         msg_Err( p_access->p_sout, "cannot spawn sout access thread" );
227         vlc_object_destroy( p_sys->p_thread );
228         return( VLC_EGENERIC );
229     }
230
231     srand( (uint32_t)mdate());
232     p_sys->p_buffer          = NULL;
233     p_sys->i_sequence_number = rand()&0xffff;
234     p_sys->i_ssrc            = rand()&0xffffffff;
235
236     if( sout_cfg_find( p_access->p_cfg, "raw" ) )
237     {
238         p_access->pf_write       = WriteRaw;
239     }
240     else
241     {
242         p_access->pf_write       = Write;
243     }
244     p_access->pf_seek        = Seek;
245
246     msg_Info( p_access, "Open: addr:`%s' port:`%d'",
247               psz_dst_addr, i_dst_port );
248
249     free( psz_dst_addr );
250     return VLC_SUCCESS;
251 }
252
253 /*****************************************************************************
254  * Close: close the target
255  *****************************************************************************/
256 static void Close( vlc_object_t * p_this )
257 {
258     sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
259     sout_access_out_sys_t   *p_sys = p_access->p_sys;
260     int                 i;
261
262     p_sys->p_thread->b_die = 1;
263     for( i = 0; i < 10; i++ )
264     {
265         sout_buffer_t       *p_dummy;
266
267         p_dummy = sout_BufferNew( p_access->p_sout, p_sys->i_mtu );
268         p_dummy->i_dts = 0;
269         p_dummy->i_pts = 0;
270         p_dummy->i_length = 0;
271         sout_FifoPut( p_sys->p_thread->p_fifo, p_dummy );
272     }
273     vlc_thread_join( p_sys->p_thread );
274
275     sout_FifoDestroy( p_access->p_sout, p_sys->p_thread->p_fifo );
276
277     if( p_sys->p_buffer )
278     {
279         sout_BufferDelete( p_access->p_sout, p_sys->p_buffer );
280     }
281
282 #if defined( UNDER_CE )
283     CloseHandle( (HANDLE)p_sys->p_thread->i_handle );
284 #elif defined( WIN32 )
285     closesocket( p_sys->p_thread->i_handle );
286 #else
287     close( p_sys->p_thread->i_handle );
288 #endif
289
290     free( p_sys );
291     msg_Info( p_access, "Close" );
292 }
293
294 /*****************************************************************************
295  * Read: standard read on a file descriptor.
296  *****************************************************************************/
297 static int Write( sout_access_out_t *p_access, sout_buffer_t *p_buffer )
298 {
299     sout_access_out_sys_t   *p_sys = p_access->p_sys;
300     unsigned int i_write;
301
302     while( p_buffer )
303     {
304         sout_buffer_t *p_next;
305         if( p_buffer->i_size > p_sys->i_mtu )
306         {
307             msg_Warn( p_access, "arggggggggggggg packet size > mtu" );
308             i_write = p_sys->i_mtu;
309         }
310         else
311         {
312             i_write = p_buffer->i_size;
313         }
314
315         /* if we have enough data, enque the buffer */
316         if( p_sys->p_buffer &&
317             p_sys->p_buffer->i_size + i_write > p_sys->i_mtu )
318         {
319             sout_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer );
320             p_sys->p_buffer = NULL;
321         }
322
323         if( !p_sys->p_buffer )
324         {
325             p_sys->p_buffer = NewUDPPacket( p_access, p_buffer->i_dts );
326         }
327
328         if( p_buffer->i_size > 0 )
329         {
330             memcpy( p_sys->p_buffer->p_buffer + p_sys->p_buffer->i_size,
331                     p_buffer->p_buffer, i_write );
332             p_sys->p_buffer->i_size += i_write;
333         }
334         p_next = p_buffer->p_next;
335         sout_BufferDelete( p_access->p_sout, p_buffer );
336         p_buffer = p_next;
337     }
338
339     return( p_sys->p_thread->b_error ? -1 : 0 );
340 }
341
342 /*****************************************************************************
343  * WriteRaw: write p_buffer without trying to fill mtu
344  *****************************************************************************/
345 static int WriteRaw( sout_access_out_t *p_access, sout_buffer_t *p_buffer )
346 {
347     sout_access_out_sys_t   *p_sys = p_access->p_sys;
348
349     sout_FifoPut( p_sys->p_thread->p_fifo, p_buffer );
350
351     return( p_sys->p_thread->b_error ? -1 : 0 );
352 }
353
354 /*****************************************************************************
355  * Seek: seek to a specific location in a file
356  *****************************************************************************/
357 static int Seek( sout_access_out_t *p_access, off_t i_pos )
358 {
359
360     msg_Err( p_access, "udp sout access cannot seek" );
361     return( -1 );
362 }
363
364 /*****************************************************************************
365  * NewUDPPacket: allocate a new UDP packet of size p_sys->i_mtu
366  *****************************************************************************/
367 static sout_buffer_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
368 {
369     sout_access_out_sys_t *p_sys = p_access->p_sys;
370     sout_buffer_t *p_buffer;
371
372     p_buffer = sout_BufferNew( p_access->p_sout, p_sys->i_mtu );
373     p_buffer->i_dts = i_dts;
374     p_buffer->i_size = 0;
375
376     if( p_sys->b_rtpts )
377     {
378         mtime_t i_timestamp = p_buffer->i_dts * 9 / 100;
379
380         /* add rtp/ts header */
381         p_buffer->p_buffer[0] = 0x80;
382         p_buffer->p_buffer[1] = 0x21; // mpeg2-ts
383
384         p_buffer->p_buffer[2] = ( p_sys->i_sequence_number >> 8 )&0xff;
385         p_buffer->p_buffer[3] = p_sys->i_sequence_number&0xff;
386         p_sys->i_sequence_number++;
387
388         p_buffer->p_buffer[4] = ( i_timestamp >> 24 )&0xff;
389         p_buffer->p_buffer[5] = ( i_timestamp >> 16 )&0xff;
390         p_buffer->p_buffer[6] = ( i_timestamp >>  8 )&0xff;
391         p_buffer->p_buffer[7] = i_timestamp&0xff;
392
393         p_buffer->p_buffer[ 8] = ( p_sys->i_ssrc >> 24 )&0xff;
394         p_buffer->p_buffer[ 9] = ( p_sys->i_ssrc >> 16 )&0xff;
395         p_buffer->p_buffer[10] = ( p_sys->i_ssrc >>  8 )&0xff;
396         p_buffer->p_buffer[11] = p_sys->i_ssrc&0xff;
397
398         p_buffer->i_size = 12;
399     }
400
401     return p_buffer;
402 }
403
404 /*****************************************************************************
405  * ThreadWrite: Write a packet on the network at the good time.
406  *****************************************************************************/
407 static void ThreadWrite( vlc_object_t *p_this )
408 {
409     sout_access_thread_t *p_thread = (sout_access_thread_t*)p_this;
410     sout_instance_t      *p_sout = p_thread->p_sout;
411     mtime_t              i_date_last = -1;
412
413     while( ! p_thread->b_die )
414     {
415         sout_buffer_t *p_pk;
416         mtime_t       i_date;
417
418         p_pk = sout_FifoGet( p_thread->p_fifo );
419
420         i_date = p_thread->i_caching + p_pk->i_dts;
421         if( i_date_last > 0 )
422         {
423             if( i_date - i_date_last > 2000000 )
424             {
425                 msg_Dbg( p_thread, "mmh, hole > 2s -> drop" );
426
427                 sout_BufferDelete( p_sout, p_pk );
428                 i_date_last = i_date;
429                 continue;
430             }
431             else if( i_date - i_date_last < -100000 )
432             {
433                 msg_Dbg( p_thread, "mmh, paquets in the past -> drop" );
434
435                 sout_BufferDelete( p_sout, p_pk );
436                 i_date_last = i_date;
437                 continue;
438             }
439         }
440
441
442         mwait( i_date );
443         send( p_thread->i_handle, p_pk->p_buffer, p_pk->i_size, 0 );
444         sout_BufferDelete( p_sout, p_pk );
445         i_date_last = i_date;
446     }
447 }