]> git.sesse.net Git - ffmpeg/blob - libavformat/udp.c
4bfe4fd4e3535b22ac1edeb7e40159a33b745a81
[ffmpeg] / libavformat / udp.c
1 /*
2  * UDP prototype streaming system
3  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21
22 /**
23  * @file
24  * UDP protocol
25  */
26
27 #define _DEFAULT_SOURCE
28 #define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
29
30 #include "avformat.h"
31 #include "avio_internal.h"
32 #include "libavutil/avassert.h"
33 #include "libavutil/parseutils.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/intreadwrite.h"
36 #include "libavutil/avstring.h"
37 #include "libavutil/opt.h"
38 #include "libavutil/log.h"
39 #include "libavutil/time.h"
40 #include "internal.h"
41 #include "network.h"
42 #include "os_support.h"
43 #include "url.h"
44 #include "ip.h"
45
46 #ifdef __APPLE__
47 #include "TargetConditionals.h"
48 #endif
49
50 #if HAVE_UDPLITE_H
51 #include "udplite.h"
52 #else
53 /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
54  * So, we provide a fallback here.
55  */
56 #define UDPLITE_SEND_CSCOV                               10
57 #define UDPLITE_RECV_CSCOV                               11
58 #endif
59
60 #ifndef IPPROTO_UDPLITE
61 #define IPPROTO_UDPLITE                                  136
62 #endif
63
64 #if HAVE_PTHREAD_CANCEL
65 #include <pthread.h>
66 #endif
67
68 #ifndef IPV6_ADD_MEMBERSHIP
69 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
70 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
71 #endif
72
73 #define UDP_TX_BUF_SIZE 32768
74 #define UDP_MAX_PKT_SIZE 65536
75 #define UDP_HEADER_SIZE 8
76
77 typedef struct UDPContext {
78     const AVClass *class;
79     int udp_fd;
80     int ttl;
81     int udplite_coverage;
82     int buffer_size;
83     int pkt_size;
84     int is_multicast;
85     int is_broadcast;
86     int local_port;
87     int reuse_socket;
88     int overrun_nonfatal;
89     struct sockaddr_storage dest_addr;
90     int dest_addr_len;
91     int is_connected;
92
93     /* Circular Buffer variables for use in UDP receive code */
94     int circular_buffer_size;
95     AVFifoBuffer *fifo;
96     int circular_buffer_error;
97     int64_t bitrate; /* number of bits to send per second */
98     int64_t burst_bits;
99     int close_req;
100 #if HAVE_PTHREAD_CANCEL
101     pthread_t circular_buffer_thread;
102     pthread_mutex_t mutex;
103     pthread_cond_t cond;
104     int thread_started;
105 #endif
106     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
107     int remaining_in_dg;
108     char *localaddr;
109     int timeout;
110     struct sockaddr_storage local_addr_storage;
111     char *sources;
112     char *block;
113     IPSourceFilters filters;
114 } UDPContext;
115
116 #define OFFSET(x) offsetof(UDPContext, x)
117 #define D AV_OPT_FLAG_DECODING_PARAM
118 #define E AV_OPT_FLAG_ENCODING_PARAM
119 static const AVOption options[] = {
120     { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
121     { "bitrate",        "Bits to send per second",                         OFFSET(bitrate),        AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
122     { "burst_bits",     "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits),   AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
123     { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
124     { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
125     { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
126     { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
127     { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
128     { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       D|E },
129     { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = D|E },
130     { "broadcast", "explicitly allow or disallow broadcast destination",   OFFSET(is_broadcast),   AV_OPT_TYPE_BOOL,   { .i64 = 0  },     0, 1,       E },
131     { "ttl",            "Time to live (multicast only)",                   OFFSET(ttl),            AV_OPT_TYPE_INT,    { .i64 = 16 },     0, INT_MAX, E },
132     { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_BOOL,   { .i64 =  0 },     0, 1,       .flags = D|E },
133     { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
134     { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
135     { "timeout",        "set raise error timeout (only in read mode)",     OFFSET(timeout),        AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, D },
136     { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
137     { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
138     { NULL }
139 };
140
141 static const AVClass udp_class = {
142     .class_name = "udp",
143     .item_name  = av_default_item_name,
144     .option     = options,
145     .version    = LIBAVUTIL_VERSION_INT,
146 };
147
148 static const AVClass udplite_context_class = {
149     .class_name     = "udplite",
150     .item_name      = av_default_item_name,
151     .option         = options,
152     .version        = LIBAVUTIL_VERSION_INT,
153 };
154
155 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
156                                  struct sockaddr *addr)
157 {
158 #ifdef IP_MULTICAST_TTL
159     if (addr->sa_family == AF_INET) {
160         if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
161             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
162             return -1;
163         }
164     }
165 #endif
166 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
167     if (addr->sa_family == AF_INET6) {
168         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
169             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
170             return -1;
171         }
172     }
173 #endif
174     return 0;
175 }
176
177 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
178 {
179 #ifdef IP_ADD_MEMBERSHIP
180     if (addr->sa_family == AF_INET) {
181         struct ip_mreq mreq;
182
183         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
184         if (local_addr)
185             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
186         else
187             mreq.imr_interface.s_addr= INADDR_ANY;
188         if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
189             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
190             return -1;
191         }
192     }
193 #endif
194 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
195     if (addr->sa_family == AF_INET6) {
196         struct ipv6_mreq mreq6;
197
198         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
199         //TODO: Interface index should be looked up from local_addr
200         mreq6.ipv6mr_interface= 0;
201         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
202             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
203             return -1;
204         }
205     }
206 #endif
207     return 0;
208 }
209
210 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
211 {
212 #ifdef IP_DROP_MEMBERSHIP
213     if (addr->sa_family == AF_INET) {
214         struct ip_mreq mreq;
215
216         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
217         if (local_addr)
218             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
219         else
220             mreq.imr_interface.s_addr= INADDR_ANY;
221         if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
222             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
223             return -1;
224         }
225     }
226 #endif
227 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
228     if (addr->sa_family == AF_INET6) {
229         struct ipv6_mreq mreq6;
230
231         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
232         //TODO: Interface index should be looked up from local_addr
233         mreq6.ipv6mr_interface= 0;
234         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
235             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
236             return -1;
237         }
238     }
239 #endif
240     return 0;
241 }
242
243 static int udp_set_multicast_sources(URLContext *h,
244                                      int sockfd, struct sockaddr *addr,
245                                      int addr_len, struct sockaddr_storage *local_addr,
246                                      struct sockaddr_storage *sources,
247                                      int nb_sources, int include)
248 {
249 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32) && (!defined(TARGET_OS_TV) || !TARGET_OS_TV)
250     /* These ones are available in the microsoft SDK, but don't seem to work
251      * as on linux, so just prefer the v4-only approach there for now. */
252     int i;
253     for (i = 0; i < nb_sources; i++) {
254         struct group_source_req mreqs;
255         int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
256
257         //TODO: Interface index should be looked up from local_addr
258         mreqs.gsr_interface = 0;
259         memcpy(&mreqs.gsr_group, addr, addr_len);
260         memcpy(&mreqs.gsr_source, &sources[i], sizeof(*sources));
261
262         if (setsockopt(sockfd, level,
263                        include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
264                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
265             if (include)
266                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
267             else
268                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
269             return ff_neterrno();
270         }
271     }
272 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
273     int i;
274     if (addr->sa_family != AF_INET) {
275         av_log(NULL, AV_LOG_ERROR,
276                "Setting multicast sources only supported for IPv4\n");
277         return AVERROR(EINVAL);
278     }
279     for (i = 0; i < nb_sources; i++) {
280         struct ip_mreq_source mreqs;
281         if (sources[i].ss_family != AF_INET) {
282             av_log(NULL, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
283             return AVERROR(EINVAL);
284         }
285
286         mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
287         if (local_addr)
288             mreqs.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
289         else
290             mreqs.imr_interface.s_addr= INADDR_ANY;
291         mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr;
292
293         if (setsockopt(sockfd, IPPROTO_IP,
294                        include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
295                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
296             if (include)
297                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
298             else
299                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
300             return ff_neterrno();
301         }
302     }
303 #else
304     return AVERROR(ENOSYS);
305 #endif
306     return 0;
307 }
308 static int udp_set_url(URLContext *h,
309                        struct sockaddr_storage *addr,
310                        const char *hostname, int port)
311 {
312     struct addrinfo *res0;
313     int addr_len;
314
315     res0 = ff_ip_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
316     if (!res0) return AVERROR(EIO);
317     memcpy(addr, res0->ai_addr, res0->ai_addrlen);
318     addr_len = res0->ai_addrlen;
319     freeaddrinfo(res0);
320
321     return addr_len;
322 }
323
324 static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr,
325                              socklen_t *addr_len, const char *localaddr)
326 {
327     UDPContext *s = h->priv_data;
328     int udp_fd = -1;
329     struct addrinfo *res0, *res;
330     int family = AF_UNSPEC;
331
332     if (((struct sockaddr *) &s->dest_addr)->sa_family)
333         family = ((struct sockaddr *) &s->dest_addr)->sa_family;
334     res0 = ff_ip_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
335                             s->local_port,
336                             SOCK_DGRAM, family, AI_PASSIVE);
337     if (!res0)
338         goto fail;
339     for (res = res0; res; res=res->ai_next) {
340         if (s->udplite_coverage)
341             udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE);
342         else
343             udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0);
344         if (udp_fd != -1) break;
345         ff_log_net_error(NULL, AV_LOG_ERROR, "socket");
346     }
347
348     if (udp_fd < 0)
349         goto fail;
350
351     memcpy(addr, res->ai_addr, res->ai_addrlen);
352     *addr_len = res->ai_addrlen;
353
354     freeaddrinfo(res0);
355
356     return udp_fd;
357
358  fail:
359     if (udp_fd >= 0)
360         closesocket(udp_fd);
361     if(res0)
362         freeaddrinfo(res0);
363     return -1;
364 }
365
366 static int udp_port(struct sockaddr_storage *addr, int addr_len)
367 {
368     char sbuf[sizeof(int)*3+1];
369     int error;
370
371     if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
372         av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
373         return -1;
374     }
375
376     return strtol(sbuf, NULL, 10);
377 }
378
379
380 /**
381  * If no filename is given to av_open_input_file because you want to
382  * get the local port first, then you must call this function to set
383  * the remote server address.
384  *
385  * url syntax: udp://host:port[?option=val...]
386  * option: 'ttl=n'       : set the ttl value (for multicast only)
387  *         'localport=n' : set the local port
388  *         'pkt_size=n'  : set max packet size
389  *         'reuse=1'     : enable reusing the socket
390  *         'overrun_nonfatal=1': survive in case of circular buffer overrun
391  *
392  * @param h media file context
393  * @param uri of the remote server
394  * @return zero if no error.
395  */
396 int ff_udp_set_remote_url(URLContext *h, const char *uri)
397 {
398     UDPContext *s = h->priv_data;
399     char hostname[256], buf[10];
400     int port;
401     const char *p;
402
403     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
404
405     /* set the destination address */
406     s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port);
407     if (s->dest_addr_len < 0) {
408         return AVERROR(EIO);
409     }
410     s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
411     p = strchr(uri, '?');
412     if (p) {
413         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
414             int was_connected = s->is_connected;
415             s->is_connected = strtol(buf, NULL, 10);
416             if (s->is_connected && !was_connected) {
417                 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
418                             s->dest_addr_len)) {
419                     s->is_connected = 0;
420                     ff_log_net_error(h, AV_LOG_ERROR, "connect");
421                     return AVERROR(EIO);
422                 }
423             }
424         }
425     }
426
427     return 0;
428 }
429
430 /**
431  * Return the local port used by the UDP connection
432  * @param h media file context
433  * @return the local port number
434  */
435 int ff_udp_get_local_port(URLContext *h)
436 {
437     UDPContext *s = h->priv_data;
438     return s->local_port;
439 }
440
441 /**
442  * Return the udp file handle for select() usage to wait for several RTP
443  * streams at the same time.
444  * @param h media file context
445  */
446 static int udp_get_file_handle(URLContext *h)
447 {
448     UDPContext *s = h->priv_data;
449     return s->udp_fd;
450 }
451
452 #if HAVE_PTHREAD_CANCEL
453 static void *circular_buffer_task_rx( void *_URLContext)
454 {
455     URLContext *h = _URLContext;
456     UDPContext *s = h->priv_data;
457     int old_cancelstate;
458
459     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
460     pthread_mutex_lock(&s->mutex);
461     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
462         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
463         s->circular_buffer_error = AVERROR(EIO);
464         goto end;
465     }
466     while(1) {
467         int len;
468         struct sockaddr_storage addr;
469         socklen_t addr_len = sizeof(addr);
470
471         pthread_mutex_unlock(&s->mutex);
472         /* Blocking operations are always cancellation points;
473            see "General Information" / "Thread Cancelation Overview"
474            in Single Unix. */
475         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
476         len = recvfrom(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0, (struct sockaddr *)&addr, &addr_len);
477         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
478         pthread_mutex_lock(&s->mutex);
479         if (len < 0) {
480             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
481                 s->circular_buffer_error = ff_neterrno();
482                 goto end;
483             }
484             continue;
485         }
486         if (ff_ip_check_source_lists(&addr, &s->filters))
487             continue;
488         AV_WL32(s->tmp, len);
489
490         if(av_fifo_space(s->fifo) < len + 4) {
491             /* No Space left */
492             if (s->overrun_nonfatal) {
493                 av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
494                         "Surviving due to overrun_nonfatal option\n");
495                 continue;
496             } else {
497                 av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
498                         "To avoid, increase fifo_size URL option. "
499                         "To survive in such case, use overrun_nonfatal option\n");
500                 s->circular_buffer_error = AVERROR(EIO);
501                 goto end;
502             }
503         }
504         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
505         pthread_cond_signal(&s->cond);
506     }
507
508 end:
509     pthread_cond_signal(&s->cond);
510     pthread_mutex_unlock(&s->mutex);
511     return NULL;
512 }
513
514 static void *circular_buffer_task_tx( void *_URLContext)
515 {
516     URLContext *h = _URLContext;
517     UDPContext *s = h->priv_data;
518     int old_cancelstate;
519     int64_t target_timestamp = av_gettime_relative();
520     int64_t start_timestamp = av_gettime_relative();
521     int64_t sent_bits = 0;
522     int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
523     int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
524
525     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
526     pthread_mutex_lock(&s->mutex);
527
528     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
529         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
530         s->circular_buffer_error = AVERROR(EIO);
531         goto end;
532     }
533
534     for(;;) {
535         int len;
536         const uint8_t *p;
537         uint8_t tmp[4];
538         int64_t timestamp;
539
540         len=av_fifo_size(s->fifo);
541
542         while (len<4) {
543             if (s->close_req)
544                 goto end;
545             if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
546                 goto end;
547             }
548             len=av_fifo_size(s->fifo);
549         }
550
551         av_fifo_generic_read(s->fifo, tmp, 4, NULL);
552         len=AV_RL32(tmp);
553
554         av_assert0(len >= 0);
555         av_assert0(len <= sizeof(s->tmp));
556
557         av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
558
559         pthread_mutex_unlock(&s->mutex);
560         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
561
562         if (s->bitrate) {
563             timestamp = av_gettime_relative();
564             if (timestamp < target_timestamp) {
565                 int64_t delay = target_timestamp - timestamp;
566                 if (delay > max_delay) {
567                     delay = max_delay;
568                     start_timestamp = timestamp + delay;
569                     sent_bits = 0;
570                 }
571                 av_usleep(delay);
572             } else {
573                 if (timestamp - burst_interval > target_timestamp) {
574                     start_timestamp = timestamp - burst_interval;
575                     sent_bits = 0;
576                 }
577             }
578             sent_bits += len * 8;
579             target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
580         }
581
582         p = s->tmp;
583         while (len) {
584             int ret;
585             av_assert0(len > 0);
586             if (!s->is_connected) {
587                 ret = sendto (s->udp_fd, p, len, 0,
588                             (struct sockaddr *) &s->dest_addr,
589                             s->dest_addr_len);
590             } else
591                 ret = send(s->udp_fd, p, len, 0);
592             if (ret >= 0) {
593                 len -= ret;
594                 p   += ret;
595             } else {
596                 ret = ff_neterrno();
597                 if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
598                     pthread_mutex_lock(&s->mutex);
599                     s->circular_buffer_error = ret;
600                     pthread_mutex_unlock(&s->mutex);
601                     return NULL;
602                 }
603             }
604         }
605
606         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
607         pthread_mutex_lock(&s->mutex);
608     }
609
610 end:
611     pthread_mutex_unlock(&s->mutex);
612     return NULL;
613 }
614
615
616 #endif
617
618 /* put it in UDP context */
619 /* return non zero if error */
620 static int udp_open(URLContext *h, const char *uri, int flags)
621 {
622     char hostname[1024], localaddr[1024] = "";
623     int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
624     UDPContext *s = h->priv_data;
625     int is_output;
626     const char *p;
627     char buf[256];
628     struct sockaddr_storage my_addr;
629     socklen_t len;
630
631     h->is_streamed = 1;
632
633     is_output = !(flags & AVIO_FLAG_READ);
634     if (s->buffer_size < 0)
635         s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
636
637     if (s->sources) {
638         if (ff_ip_parse_sources(h, s->sources, &s->filters) < 0)
639             goto fail;
640     }
641
642     if (s->block) {
643         if (ff_ip_parse_blocks(h, s->block, &s->filters) < 0)
644             goto fail;
645     }
646
647     if (s->pkt_size > 0)
648         h->max_packet_size = s->pkt_size;
649
650     p = strchr(uri, '?');
651     if (p) {
652         if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
653             char *endptr = NULL;
654             s->reuse_socket = strtol(buf, &endptr, 10);
655             /* assume if no digits were found it is a request to enable it */
656             if (buf == endptr)
657                 s->reuse_socket = 1;
658         }
659         if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
660             char *endptr = NULL;
661             s->overrun_nonfatal = strtol(buf, &endptr, 10);
662             /* assume if no digits were found it is a request to enable it */
663             if (buf == endptr)
664                 s->overrun_nonfatal = 1;
665             if (!HAVE_PTHREAD_CANCEL)
666                 av_log(h, AV_LOG_WARNING,
667                        "'overrun_nonfatal' option was set but it is not supported "
668                        "on this build (pthread support is required)\n");
669         }
670         if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
671             s->ttl = strtol(buf, NULL, 10);
672         }
673         if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
674             s->udplite_coverage = strtol(buf, NULL, 10);
675         }
676         if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
677             s->local_port = strtol(buf, NULL, 10);
678         }
679         if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
680             s->pkt_size = strtol(buf, NULL, 10);
681         }
682         if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
683             s->buffer_size = strtol(buf, NULL, 10);
684         }
685         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
686             s->is_connected = strtol(buf, NULL, 10);
687         }
688         if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
689             dscp = strtol(buf, NULL, 10);
690         }
691         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
692             s->circular_buffer_size = strtol(buf, NULL, 10);
693             if (!HAVE_PTHREAD_CANCEL)
694                 av_log(h, AV_LOG_WARNING,
695                        "'circular_buffer_size' option was set but it is not supported "
696                        "on this build (pthread support is required)\n");
697         }
698         if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
699             s->bitrate = strtoll(buf, NULL, 10);
700             if (!HAVE_PTHREAD_CANCEL)
701                 av_log(h, AV_LOG_WARNING,
702                        "'bitrate' option was set but it is not supported "
703                        "on this build (pthread support is required)\n");
704         }
705         if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
706             s->burst_bits = strtoll(buf, NULL, 10);
707         }
708         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
709             av_strlcpy(localaddr, buf, sizeof(localaddr));
710         }
711         if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
712             if (ff_ip_parse_sources(h, buf, &s->filters) < 0)
713                 goto fail;
714         }
715         if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
716             if (ff_ip_parse_blocks(h, buf, &s->filters) < 0)
717                 goto fail;
718         }
719         if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
720             s->timeout = strtol(buf, NULL, 10);
721         if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
722             s->is_broadcast = strtol(buf, NULL, 10);
723     }
724     /* handling needed to support options picking from both AVOption and URL */
725     s->circular_buffer_size *= 188;
726     if (flags & AVIO_FLAG_WRITE) {
727         h->max_packet_size = s->pkt_size;
728     } else {
729         h->max_packet_size = UDP_MAX_PKT_SIZE;
730     }
731     h->rw_timeout = s->timeout;
732
733     /* fill the dest addr */
734     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
735
736     /* XXX: fix av_url_split */
737     if (hostname[0] == '\0' || hostname[0] == '?') {
738         /* only accepts null hostname if input */
739         if (!(flags & AVIO_FLAG_READ))
740             goto fail;
741     } else {
742         if (ff_udp_set_remote_url(h, uri) < 0)
743             goto fail;
744     }
745
746     if ((s->is_multicast || s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
747         s->local_port = port;
748
749     if (localaddr[0])
750         udp_fd = udp_socket_create(h, &my_addr, &len, localaddr);
751     else
752         udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr);
753     if (udp_fd < 0)
754         goto fail;
755
756     s->local_addr_storage=my_addr; //store for future multicast join
757
758     /* Follow the requested reuse option, unless it's multicast in which
759      * case enable reuse unless explicitly disabled.
760      */
761     if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
762         s->reuse_socket = 1;
763         if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
764             goto fail;
765     }
766
767     if (s->is_broadcast) {
768 #ifdef SO_BROADCAST
769         if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0)
770 #endif
771            goto fail;
772     }
773
774     /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
775      * The receiver coverage has to be less than or equal to the sender coverage.
776      * Otherwise, the receiver will drop all packets.
777      */
778     if (s->udplite_coverage) {
779         if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
780             av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
781
782         if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
783             av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
784     }
785
786     if (dscp >= 0) {
787         dscp <<= 2;
788         if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0)
789             goto fail;
790     }
791
792     /* If multicast, try binding the multicast address first, to avoid
793      * receiving UDP packets from other sources aimed at the same UDP
794      * port. This fails on windows. This makes sending to the same address
795      * using sendto() fail, so only do it if we're opened in read-only mode. */
796     if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
797         bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
798     }
799     /* bind to the local address if not multicast or if the multicast
800      * bind failed */
801     /* the bind is needed to give a port to the socket now */
802     if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
803         ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
804         goto fail;
805     }
806
807     len = sizeof(my_addr);
808     getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
809     s->local_port = udp_port(&my_addr, len);
810
811     if (s->is_multicast) {
812         if (h->flags & AVIO_FLAG_WRITE) {
813             /* output */
814             if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
815                 goto fail;
816         }
817         if (h->flags & AVIO_FLAG_READ) {
818             /* input */
819             if (s->filters.nb_include_addrs) {
820                 if (udp_set_multicast_sources(h, udp_fd,
821                                               (struct sockaddr *)&s->dest_addr,
822                                               s->dest_addr_len, &s->local_addr_storage,
823                                               s->filters.include_addrs,
824                                               s->filters.nb_include_addrs, 1) < 0)
825                     goto fail;
826             } else {
827                 if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0)
828                     goto fail;
829             }
830             if (s->filters.nb_exclude_addrs) {
831                 if (udp_set_multicast_sources(h, udp_fd,
832                                               (struct sockaddr *)&s->dest_addr,
833                                               s->dest_addr_len, &s->local_addr_storage,
834                                               s->filters.exclude_addrs,
835                                               s->filters.nb_exclude_addrs, 0) < 0)
836                     goto fail;
837             }
838         }
839     }
840
841     if (is_output) {
842         /* limit the tx buf size to limit latency */
843         tmp = s->buffer_size;
844         if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
845             ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
846             goto fail;
847         }
848     } else {
849         /* set udp recv buffer size to the requested value (default 64K) */
850         tmp = s->buffer_size;
851         if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
852             ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
853         }
854         len = sizeof(tmp);
855         if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
856             ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
857         } else {
858             av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
859             if(tmp < s->buffer_size)
860                 av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
861         }
862
863         /* make the socket non-blocking */
864         ff_socket_nonblock(udp_fd, 1);
865     }
866     if (s->is_connected) {
867         if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
868             ff_log_net_error(h, AV_LOG_ERROR, "connect");
869             goto fail;
870         }
871     }
872
873     s->udp_fd = udp_fd;
874
875 #if HAVE_PTHREAD_CANCEL
876     /*
877       Create thread in case of:
878       1. Input and circular_buffer_size is set
879       2. Output and bitrate and circular_buffer_size is set
880     */
881
882     if (is_output && s->bitrate && !s->circular_buffer_size) {
883         /* Warn user in case of 'circular_buffer_size' is not set */
884         av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
885     }
886
887     if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
888         int ret;
889
890         /* start the task going */
891         s->fifo = av_fifo_alloc(s->circular_buffer_size);
892         ret = pthread_mutex_init(&s->mutex, NULL);
893         if (ret != 0) {
894             av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
895             goto fail;
896         }
897         ret = pthread_cond_init(&s->cond, NULL);
898         if (ret != 0) {
899             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
900             goto cond_fail;
901         }
902         ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
903         if (ret != 0) {
904             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
905             goto thread_fail;
906         }
907         s->thread_started = 1;
908     }
909 #endif
910
911     return 0;
912 #if HAVE_PTHREAD_CANCEL
913  thread_fail:
914     pthread_cond_destroy(&s->cond);
915  cond_fail:
916     pthread_mutex_destroy(&s->mutex);
917 #endif
918  fail:
919     if (udp_fd >= 0)
920         closesocket(udp_fd);
921     av_fifo_freep(&s->fifo);
922     ff_ip_reset_filters(&s->filters);
923     return AVERROR(EIO);
924 }
925
926 static int udplite_open(URLContext *h, const char *uri, int flags)
927 {
928     UDPContext *s = h->priv_data;
929
930     // set default checksum coverage
931     s->udplite_coverage = UDP_HEADER_SIZE;
932
933     return udp_open(h, uri, flags);
934 }
935
936 static int udp_read(URLContext *h, uint8_t *buf, int size)
937 {
938     UDPContext *s = h->priv_data;
939     int ret;
940     struct sockaddr_storage addr;
941     socklen_t addr_len = sizeof(addr);
942 #if HAVE_PTHREAD_CANCEL
943     int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
944
945     if (s->fifo) {
946         pthread_mutex_lock(&s->mutex);
947         do {
948             avail = av_fifo_size(s->fifo);
949             if (avail) { // >=size) {
950                 uint8_t tmp[4];
951
952                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
953                 avail= AV_RL32(tmp);
954                 if(avail > size){
955                     av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
956                     avail= size;
957                 }
958
959                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
960                 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
961                 pthread_mutex_unlock(&s->mutex);
962                 return avail;
963             } else if(s->circular_buffer_error){
964                 int err = s->circular_buffer_error;
965                 pthread_mutex_unlock(&s->mutex);
966                 return err;
967             } else if(nonblock) {
968                 pthread_mutex_unlock(&s->mutex);
969                 return AVERROR(EAGAIN);
970             }
971             else {
972                 /* FIXME: using the monotonic clock would be better,
973                    but it does not exist on all supported platforms. */
974                 int64_t t = av_gettime() + 100000;
975                 struct timespec tv = { .tv_sec  =  t / 1000000,
976                                        .tv_nsec = (t % 1000000) * 1000 };
977                 if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
978                     pthread_mutex_unlock(&s->mutex);
979                     return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
980                 }
981                 nonblock = 1;
982             }
983         } while( 1);
984     }
985 #endif
986
987     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
988         ret = ff_network_wait_fd(s->udp_fd, 0);
989         if (ret < 0)
990             return ret;
991     }
992     ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&addr, &addr_len);
993     if (ret < 0)
994         return ff_neterrno();
995     if (ff_ip_check_source_lists(&addr, &s->filters))
996         return AVERROR(EINTR);
997     return ret;
998 }
999
1000 static int udp_write(URLContext *h, const uint8_t *buf, int size)
1001 {
1002     UDPContext *s = h->priv_data;
1003     int ret;
1004
1005 #if HAVE_PTHREAD_CANCEL
1006     if (s->fifo) {
1007         uint8_t tmp[4];
1008
1009         pthread_mutex_lock(&s->mutex);
1010
1011         /*
1012           Return error if last tx failed.
1013           Here we can't know on which packet error was, but it needs to know that error exists.
1014         */
1015         if (s->circular_buffer_error<0) {
1016             int err=s->circular_buffer_error;
1017             pthread_mutex_unlock(&s->mutex);
1018             return err;
1019         }
1020
1021         if(av_fifo_space(s->fifo) < size + 4) {
1022             /* What about a partial packet tx ? */
1023             pthread_mutex_unlock(&s->mutex);
1024             return AVERROR(ENOMEM);
1025         }
1026         AV_WL32(tmp, size);
1027         av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
1028         av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
1029         pthread_cond_signal(&s->cond);
1030         pthread_mutex_unlock(&s->mutex);
1031         return size;
1032     }
1033 #endif
1034     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1035         ret = ff_network_wait_fd(s->udp_fd, 1);
1036         if (ret < 0)
1037             return ret;
1038     }
1039
1040     if (!s->is_connected) {
1041         ret = sendto (s->udp_fd, buf, size, 0,
1042                       (struct sockaddr *) &s->dest_addr,
1043                       s->dest_addr_len);
1044     } else
1045         ret = send(s->udp_fd, buf, size, 0);
1046
1047     return ret < 0 ? ff_neterrno() : ret;
1048 }
1049
1050 static int udp_close(URLContext *h)
1051 {
1052     UDPContext *s = h->priv_data;
1053
1054 #if HAVE_PTHREAD_CANCEL
1055     // Request close once writing is finished
1056     if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
1057         pthread_mutex_lock(&s->mutex);
1058         s->close_req = 1;
1059         pthread_cond_signal(&s->cond);
1060         pthread_mutex_unlock(&s->mutex);
1061     }
1062 #endif
1063
1064     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
1065         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
1066 #if HAVE_PTHREAD_CANCEL
1067     if (s->thread_started) {
1068         int ret;
1069         // Cancel only read, as write has been signaled as success to the user
1070         if (h->flags & AVIO_FLAG_READ)
1071             pthread_cancel(s->circular_buffer_thread);
1072         ret = pthread_join(s->circular_buffer_thread, NULL);
1073         if (ret != 0)
1074             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
1075         pthread_mutex_destroy(&s->mutex);
1076         pthread_cond_destroy(&s->cond);
1077     }
1078 #endif
1079     closesocket(s->udp_fd);
1080     av_fifo_freep(&s->fifo);
1081     ff_ip_reset_filters(&s->filters);
1082     return 0;
1083 }
1084
1085 const URLProtocol ff_udp_protocol = {
1086     .name                = "udp",
1087     .url_open            = udp_open,
1088     .url_read            = udp_read,
1089     .url_write           = udp_write,
1090     .url_close           = udp_close,
1091     .url_get_file_handle = udp_get_file_handle,
1092     .priv_data_size      = sizeof(UDPContext),
1093     .priv_data_class     = &udp_class,
1094     .flags               = URL_PROTOCOL_FLAG_NETWORK,
1095 };
1096
1097 const URLProtocol ff_udplite_protocol = {
1098     .name                = "udplite",
1099     .url_open            = udplite_open,
1100     .url_read            = udp_read,
1101     .url_write           = udp_write,
1102     .url_close           = udp_close,
1103     .url_get_file_handle = udp_get_file_handle,
1104     .priv_data_size      = sizeof(UDPContext),
1105     .priv_data_class     = &udplite_context_class,
1106     .flags               = URL_PROTOCOL_FLAG_NETWORK,
1107 };