]> git.sesse.net Git - ffmpeg/blob - libavformat/udp.c
lavc: add AV_CODEC_ID_BIN_DATA.
[ffmpeg] / libavformat / udp.c
1 /*
2  * UDP prototype streaming system
3  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21
22 /**
23  * @file
24  * UDP protocol
25  */
26
27 #define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
28
29 #include "avformat.h"
30 #include "avio_internal.h"
31 #include "libavutil/parseutils.h"
32 #include "libavutil/fifo.h"
33 #include "libavutil/intreadwrite.h"
34 #include "libavutil/avstring.h"
35 #include "libavutil/opt.h"
36 #include "libavutil/log.h"
37 #include "libavutil/time.h"
38 #include "internal.h"
39 #include "network.h"
40 #include "os_support.h"
41 #include "url.h"
42
43 #if HAVE_PTHREAD_CANCEL
44 #include <pthread.h>
45 #endif
46
47 #ifndef HAVE_PTHREAD_CANCEL
48 #define HAVE_PTHREAD_CANCEL 0
49 #endif
50
51 #ifndef IPV6_ADD_MEMBERSHIP
52 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
53 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
54 #endif
55
56 #define UDP_TX_BUF_SIZE 32768
57 #define UDP_MAX_PKT_SIZE 65536
58
59 typedef struct {
60     const AVClass *class;
61     int udp_fd;
62     int ttl;
63     int buffer_size;
64     int is_multicast;
65     int local_port;
66     int reuse_socket;
67     int overrun_nonfatal;
68     struct sockaddr_storage dest_addr;
69     int dest_addr_len;
70     int is_connected;
71
72     /* Circular Buffer variables for use in UDP receive code */
73     int circular_buffer_size;
74     AVFifoBuffer *fifo;
75     int circular_buffer_error;
76 #if HAVE_PTHREAD_CANCEL
77     pthread_t circular_buffer_thread;
78     pthread_mutex_t mutex;
79     pthread_cond_t cond;
80     int thread_started;
81 #endif
82     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
83     int remaining_in_dg;
84     char *local_addr;
85     int packet_size;
86     int timeout;
87     struct sockaddr_storage local_addr_storage;
88 } UDPContext;
89
90 #define OFFSET(x) offsetof(UDPContext, x)
91 #define D AV_OPT_FLAG_DECODING_PARAM
92 #define E AV_OPT_FLAG_ENCODING_PARAM
93 static const AVOption options[] = {
94 {"buffer_size", "set packet buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
95 {"localport", "set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
96 {"localaddr", "choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E },
97 {"pkt_size", "set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E },
98 {"reuse", "explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
99 {"ttl", "set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E },
100 {"connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
101 /* TODO 'sources', 'block' option */
102 {"fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
103 {"overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D },
104 {"timeout", "set raise error timeout (only in read mode)", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D },
105 {NULL}
106 };
107
108 static const AVClass udp_context_class = {
109     .class_name     = "udp",
110     .item_name      = av_default_item_name,
111     .option         = options,
112     .version        = LIBAVUTIL_VERSION_INT,
113 };
114
115 static void log_net_error(void *ctx, int level, const char* prefix)
116 {
117     char errbuf[100];
118     av_strerror(ff_neterrno(), errbuf, sizeof(errbuf));
119     av_log(ctx, level, "%s: %s\n", prefix, errbuf);
120 }
121
122 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
123                                  struct sockaddr *addr)
124 {
125 #ifdef IP_MULTICAST_TTL
126     if (addr->sa_family == AF_INET) {
127         if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
128             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
129             return -1;
130         }
131     }
132 #endif
133 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
134     if (addr->sa_family == AF_INET6) {
135         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
136             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
137             return -1;
138         }
139     }
140 #endif
141     return 0;
142 }
143
144 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
145 {
146 #ifdef IP_ADD_MEMBERSHIP
147     if (addr->sa_family == AF_INET) {
148         struct ip_mreq mreq;
149
150         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
151         if (local_addr)
152             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
153         else
154             mreq.imr_interface.s_addr= INADDR_ANY;
155         if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
156             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
157             return -1;
158         }
159     }
160 #endif
161 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
162     if (addr->sa_family == AF_INET6) {
163         struct ipv6_mreq mreq6;
164
165         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
166         mreq6.ipv6mr_interface= 0;
167         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
168             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
169             return -1;
170         }
171     }
172 #endif
173     return 0;
174 }
175
176 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
177 {
178 #ifdef IP_DROP_MEMBERSHIP
179     if (addr->sa_family == AF_INET) {
180         struct ip_mreq mreq;
181
182         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
183         if (local_addr)
184             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
185         else
186             mreq.imr_interface.s_addr= INADDR_ANY;
187         if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
188             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
189             return -1;
190         }
191     }
192 #endif
193 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
194     if (addr->sa_family == AF_INET6) {
195         struct ipv6_mreq mreq6;
196
197         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
198         mreq6.ipv6mr_interface= 0;
199         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
200             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
201             return -1;
202         }
203     }
204 #endif
205     return 0;
206 }
207
208 static struct addrinfo* udp_resolve_host(const char *hostname, int port,
209                                          int type, int family, int flags)
210 {
211     struct addrinfo hints = { 0 }, *res = 0;
212     int error;
213     char sport[16];
214     const char *node = 0, *service = "0";
215
216     if (port > 0) {
217         snprintf(sport, sizeof(sport), "%d", port);
218         service = sport;
219     }
220     if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
221         node = hostname;
222     }
223     hints.ai_socktype = type;
224     hints.ai_family   = family;
225     hints.ai_flags = flags;
226     if ((error = getaddrinfo(node, service, &hints, &res))) {
227         res = NULL;
228         av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
229     }
230
231     return res;
232 }
233
234 static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
235                                      int addr_len, char **sources,
236                                      int nb_sources, int include)
237 {
238 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
239     /* These ones are available in the microsoft SDK, but don't seem to work
240      * as on linux, so just prefer the v4-only approach there for now. */
241     int i;
242     for (i = 0; i < nb_sources; i++) {
243         struct group_source_req mreqs;
244         int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
245         struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
246                                                        SOCK_DGRAM, AF_UNSPEC,
247                                                        0);
248         if (!sourceaddr)
249             return AVERROR(ENOENT);
250
251         mreqs.gsr_interface = 0;
252         memcpy(&mreqs.gsr_group, addr, addr_len);
253         memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
254         freeaddrinfo(sourceaddr);
255
256         if (setsockopt(sockfd, level,
257                        include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
258                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
259             if (include)
260                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
261             else
262                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
263             return ff_neterrno();
264         }
265     }
266 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
267     int i;
268     if (addr->sa_family != AF_INET) {
269         av_log(NULL, AV_LOG_ERROR,
270                "Setting multicast sources only supported for IPv4\n");
271         return AVERROR(EINVAL);
272     }
273     for (i = 0; i < nb_sources; i++) {
274         struct ip_mreq_source mreqs;
275         struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
276                                                        SOCK_DGRAM, AF_UNSPEC,
277                                                        0);
278         if (!sourceaddr)
279             return AVERROR(ENOENT);
280         if (sourceaddr->ai_addr->sa_family != AF_INET) {
281             freeaddrinfo(sourceaddr);
282             av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
283                    sources[i]);
284             return AVERROR(EINVAL);
285         }
286
287         mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
288         mreqs.imr_interface.s_addr = INADDR_ANY;
289         mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
290         freeaddrinfo(sourceaddr);
291
292         if (setsockopt(sockfd, IPPROTO_IP,
293                        include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
294                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
295             if (include)
296                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
297             else
298                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
299             return ff_neterrno();
300         }
301     }
302 #else
303     return AVERROR(ENOSYS);
304 #endif
305     return 0;
306 }
307 static int udp_set_url(struct sockaddr_storage *addr,
308                        const char *hostname, int port)
309 {
310     struct addrinfo *res0;
311     int addr_len;
312
313     res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
314     if (res0 == 0) return AVERROR(EIO);
315     memcpy(addr, res0->ai_addr, res0->ai_addrlen);
316     addr_len = res0->ai_addrlen;
317     freeaddrinfo(res0);
318
319     return addr_len;
320 }
321
322 static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
323                              socklen_t *addr_len, const char *localaddr)
324 {
325     int udp_fd = -1;
326     struct addrinfo *res0 = NULL, *res = NULL;
327     int family = AF_UNSPEC;
328
329     if (((struct sockaddr *) &s->dest_addr)->sa_family)
330         family = ((struct sockaddr *) &s->dest_addr)->sa_family;
331     res0 = udp_resolve_host(localaddr[0] ? localaddr : NULL, s->local_port,
332                             SOCK_DGRAM, family, AI_PASSIVE);
333     if (res0 == 0)
334         goto fail;
335     for (res = res0; res; res=res->ai_next) {
336         udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0);
337         if (udp_fd != -1) break;
338         log_net_error(NULL, AV_LOG_ERROR, "socket");
339     }
340
341     if (udp_fd < 0)
342         goto fail;
343
344     memcpy(addr, res->ai_addr, res->ai_addrlen);
345     *addr_len = res->ai_addrlen;
346
347     freeaddrinfo(res0);
348
349     return udp_fd;
350
351  fail:
352     if (udp_fd >= 0)
353         closesocket(udp_fd);
354     if(res0)
355         freeaddrinfo(res0);
356     return -1;
357 }
358
359 static int udp_port(struct sockaddr_storage *addr, int addr_len)
360 {
361     char sbuf[sizeof(int)*3+1];
362     int error;
363
364     if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
365         av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
366         return -1;
367     }
368
369     return strtol(sbuf, NULL, 10);
370 }
371
372
373 /**
374  * If no filename is given to av_open_input_file because you want to
375  * get the local port first, then you must call this function to set
376  * the remote server address.
377  *
378  * url syntax: udp://host:port[?option=val...]
379  * option: 'ttl=n'       : set the ttl value (for multicast only)
380  *         'localport=n' : set the local port
381  *         'pkt_size=n'  : set max packet size
382  *         'reuse=1'     : enable reusing the socket
383  *         'overrun_nonfatal=1': survive in case of circular buffer overrun
384  *
385  * @param h media file context
386  * @param uri of the remote server
387  * @return zero if no error.
388  */
389 int ff_udp_set_remote_url(URLContext *h, const char *uri)
390 {
391     UDPContext *s = h->priv_data;
392     char hostname[256], buf[10];
393     int port;
394     const char *p;
395
396     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
397
398     /* set the destination address */
399     s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
400     if (s->dest_addr_len < 0) {
401         return AVERROR(EIO);
402     }
403     s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
404     p = strchr(uri, '?');
405     if (p) {
406         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
407             int was_connected = s->is_connected;
408             s->is_connected = strtol(buf, NULL, 10);
409             if (s->is_connected && !was_connected) {
410                 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
411                             s->dest_addr_len)) {
412                     s->is_connected = 0;
413                     log_net_error(h, AV_LOG_ERROR, "connect");
414                     return AVERROR(EIO);
415                 }
416             }
417         }
418     }
419
420     return 0;
421 }
422
423 /**
424  * Return the local port used by the UDP connection
425  * @param h media file context
426  * @return the local port number
427  */
428 int ff_udp_get_local_port(URLContext *h)
429 {
430     UDPContext *s = h->priv_data;
431     return s->local_port;
432 }
433
434 /**
435  * Return the udp file handle for select() usage to wait for several RTP
436  * streams at the same time.
437  * @param h media file context
438  */
439 static int udp_get_file_handle(URLContext *h)
440 {
441     UDPContext *s = h->priv_data;
442     return s->udp_fd;
443 }
444
445 #if HAVE_PTHREAD_CANCEL
446 static void *circular_buffer_task( void *_URLContext)
447 {
448     URLContext *h = _URLContext;
449     UDPContext *s = h->priv_data;
450     int old_cancelstate;
451
452     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
453     pthread_mutex_lock(&s->mutex);
454     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
455         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
456         s->circular_buffer_error = AVERROR(EIO);
457         goto end;
458     }
459     while(1) {
460         int len;
461
462         pthread_mutex_unlock(&s->mutex);
463         /* Blocking operations are always cancellation points;
464            see "General Information" / "Thread Cancelation Overview"
465            in Single Unix. */
466         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
467         len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
468         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
469         pthread_mutex_lock(&s->mutex);
470         if (len < 0) {
471             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
472                 s->circular_buffer_error = ff_neterrno();
473                 goto end;
474             }
475             continue;
476         }
477         AV_WL32(s->tmp, len);
478
479         if(av_fifo_space(s->fifo) < len + 4) {
480             /* No Space left */
481             if (s->overrun_nonfatal) {
482                 av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
483                         "Surviving due to overrun_nonfatal option\n");
484                 continue;
485             } else {
486                 av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
487                         "To avoid, increase fifo_size URL option. "
488                         "To survive in such case, use overrun_nonfatal option\n");
489                 s->circular_buffer_error = AVERROR(EIO);
490                 goto end;
491             }
492         }
493         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
494         pthread_cond_signal(&s->cond);
495     }
496
497 end:
498     pthread_cond_signal(&s->cond);
499     pthread_mutex_unlock(&s->mutex);
500     return NULL;
501 }
502 #endif
503
504 static int parse_source_list(char *buf, char **sources, int *num_sources,
505                              int max_sources)
506 {
507     char *source_start;
508
509     source_start = buf;
510     while (1) {
511         char *next = strchr(source_start, ',');
512         if (next)
513             *next = '\0';
514         sources[*num_sources] = av_strdup(source_start);
515         if (!sources[*num_sources])
516             return AVERROR(ENOMEM);
517         source_start = next + 1;
518         (*num_sources)++;
519         if (*num_sources >= max_sources || !next)
520             break;
521     }
522     return 0;
523 }
524
525 /* put it in UDP context */
526 /* return non zero if error */
527 static int udp_open(URLContext *h, const char *uri, int flags)
528 {
529     char hostname[1024], localaddr[1024] = "";
530     int port, udp_fd = -1, tmp, bind_ret = -1;
531     UDPContext *s = h->priv_data;
532     int is_output;
533     const char *p;
534     char buf[256];
535     struct sockaddr_storage my_addr;
536     socklen_t len;
537     int reuse_specified = 0;
538     int i, num_include_sources = 0, num_exclude_sources = 0;
539     char *include_sources[32], *exclude_sources[32];
540
541     h->is_streamed = 1;
542
543     is_output = !(flags & AVIO_FLAG_READ);
544     if (!s->buffer_size) /* if not set explicitly */
545         s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
546
547     p = strchr(uri, '?');
548     if (p) {
549         if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
550             char *endptr = NULL;
551             s->reuse_socket = strtol(buf, &endptr, 10);
552             /* assume if no digits were found it is a request to enable it */
553             if (buf == endptr)
554                 s->reuse_socket = 1;
555             reuse_specified = 1;
556         }
557         if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
558             char *endptr = NULL;
559             s->overrun_nonfatal = strtol(buf, &endptr, 10);
560             /* assume if no digits were found it is a request to enable it */
561             if (buf == endptr)
562                 s->overrun_nonfatal = 1;
563             if (!HAVE_PTHREAD_CANCEL)
564                 av_log(h, AV_LOG_WARNING,
565                        "'overrun_nonfatal' option was set but it is not supported "
566                        "on this build (pthread support is required)\n");
567         }
568         if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
569             s->ttl = strtol(buf, NULL, 10);
570         }
571         if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
572             s->local_port = strtol(buf, NULL, 10);
573         }
574         if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
575             s->packet_size = strtol(buf, NULL, 10);
576         }
577         if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
578             s->buffer_size = strtol(buf, NULL, 10);
579         }
580         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
581             s->is_connected = strtol(buf, NULL, 10);
582         }
583         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
584             s->circular_buffer_size = strtol(buf, NULL, 10);
585             if (!HAVE_PTHREAD_CANCEL)
586                 av_log(h, AV_LOG_WARNING,
587                        "'circular_buffer_size' option was set but it is not supported "
588                        "on this build (pthread support is required)\n");
589         }
590         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
591             av_strlcpy(localaddr, buf, sizeof(localaddr));
592         }
593         if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
594             if (parse_source_list(buf, include_sources, &num_include_sources,
595                                   FF_ARRAY_ELEMS(include_sources)))
596                 goto fail;
597         }
598         if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
599             if (parse_source_list(buf, exclude_sources, &num_exclude_sources,
600                                   FF_ARRAY_ELEMS(exclude_sources)))
601                 goto fail;
602         }
603         if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
604             s->timeout = strtol(buf, NULL, 10);
605     }
606     /* handling needed to support options picking from both AVOption and URL */
607     s->circular_buffer_size *= 188;
608     if (flags & AVIO_FLAG_WRITE) {
609         h->max_packet_size = s->packet_size;
610     } else {
611         h->max_packet_size = UDP_MAX_PKT_SIZE;
612     }
613     h->rw_timeout = s->timeout;
614
615     /* fill the dest addr */
616     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
617
618     /* XXX: fix av_url_split */
619     if (hostname[0] == '\0' || hostname[0] == '?') {
620         /* only accepts null hostname if input */
621         if (!(flags & AVIO_FLAG_READ))
622             goto fail;
623     } else {
624         if (ff_udp_set_remote_url(h, uri) < 0)
625             goto fail;
626     }
627
628     if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
629         s->local_port = port;
630     udp_fd = udp_socket_create(s, &my_addr, &len, localaddr[0] ? localaddr : s->local_addr);
631     if (udp_fd < 0)
632         goto fail;
633
634     s->local_addr_storage=my_addr; //store for future multicast join
635
636     /* Follow the requested reuse option, unless it's multicast in which
637      * case enable reuse unless explicitly disabled.
638      */
639     if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
640         s->reuse_socket = 1;
641         if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
642             goto fail;
643     }
644
645     /* If multicast, try binding the multicast address first, to avoid
646      * receiving UDP packets from other sources aimed at the same UDP
647      * port. This fails on windows. This makes sending to the same address
648      * using sendto() fail, so only do it if we're opened in read-only mode. */
649     if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
650         bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
651     }
652     /* bind to the local address if not multicast or if the multicast
653      * bind failed */
654     /* the bind is needed to give a port to the socket now */
655     if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
656         log_net_error(h, AV_LOG_ERROR, "bind failed");
657         goto fail;
658     }
659
660     len = sizeof(my_addr);
661     getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
662     s->local_port = udp_port(&my_addr, len);
663
664     if (s->is_multicast) {
665         if (h->flags & AVIO_FLAG_WRITE) {
666             /* output */
667             if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
668                 goto fail;
669         }
670         if (h->flags & AVIO_FLAG_READ) {
671             /* input */
672             if (num_include_sources && num_exclude_sources) {
673                 av_log(h, AV_LOG_ERROR, "Simultaneously including and excluding multicast sources is not supported\n");
674                 goto fail;
675             }
676             if (num_include_sources) {
677                 if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, include_sources, num_include_sources, 1) < 0)
678                     goto fail;
679             } else {
680                 if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0)
681                     goto fail;
682             }
683             if (num_exclude_sources) {
684                 if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, exclude_sources, num_exclude_sources, 0) < 0)
685                     goto fail;
686             }
687         }
688     }
689
690     if (is_output) {
691         /* limit the tx buf size to limit latency */
692         tmp = s->buffer_size;
693         if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
694             log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
695             goto fail;
696         }
697     } else {
698         /* set udp recv buffer size to the requested value (default 64K) */
699         tmp = s->buffer_size;
700         if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
701             log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
702         }
703         len = sizeof(tmp);
704         if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
705             log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
706         } else {
707             av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
708             if(tmp < s->buffer_size)
709                 av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
710         }
711
712         /* make the socket non-blocking */
713         ff_socket_nonblock(udp_fd, 1);
714     }
715     if (s->is_connected) {
716         if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
717             log_net_error(h, AV_LOG_ERROR, "connect");
718             goto fail;
719         }
720     }
721
722     for (i = 0; i < num_include_sources; i++)
723         av_freep(&include_sources[i]);
724     for (i = 0; i < num_exclude_sources; i++)
725         av_freep(&exclude_sources[i]);
726
727     s->udp_fd = udp_fd;
728
729 #if HAVE_PTHREAD_CANCEL
730     if (!is_output && s->circular_buffer_size) {
731         int ret;
732
733         /* start the task going */
734         s->fifo = av_fifo_alloc(s->circular_buffer_size);
735         ret = pthread_mutex_init(&s->mutex, NULL);
736         if (ret != 0) {
737             av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
738             goto fail;
739         }
740         ret = pthread_cond_init(&s->cond, NULL);
741         if (ret != 0) {
742             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
743             goto cond_fail;
744         }
745         ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
746         if (ret != 0) {
747             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
748             goto thread_fail;
749         }
750         s->thread_started = 1;
751     }
752 #endif
753
754     return 0;
755 #if HAVE_PTHREAD_CANCEL
756  thread_fail:
757     pthread_cond_destroy(&s->cond);
758  cond_fail:
759     pthread_mutex_destroy(&s->mutex);
760 #endif
761  fail:
762     if (udp_fd >= 0)
763         closesocket(udp_fd);
764     av_fifo_free(s->fifo);
765     for (i = 0; i < num_include_sources; i++)
766         av_freep(&include_sources[i]);
767     for (i = 0; i < num_exclude_sources; i++)
768         av_freep(&exclude_sources[i]);
769     return AVERROR(EIO);
770 }
771
772 static int udp_read(URLContext *h, uint8_t *buf, int size)
773 {
774     UDPContext *s = h->priv_data;
775     int ret;
776     int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
777
778 #if HAVE_PTHREAD_CANCEL
779     if (s->fifo) {
780         pthread_mutex_lock(&s->mutex);
781         do {
782             avail = av_fifo_size(s->fifo);
783             if (avail) { // >=size) {
784                 uint8_t tmp[4];
785
786                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
787                 avail= AV_RL32(tmp);
788                 if(avail > size){
789                     av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
790                     avail= size;
791                 }
792
793                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
794                 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
795                 pthread_mutex_unlock(&s->mutex);
796                 return avail;
797             } else if(s->circular_buffer_error){
798                 int err = s->circular_buffer_error;
799                 pthread_mutex_unlock(&s->mutex);
800                 return err;
801             } else if(nonblock) {
802                 pthread_mutex_unlock(&s->mutex);
803                 return AVERROR(EAGAIN);
804             }
805             else {
806                 /* FIXME: using the monotonic clock would be better,
807                    but it does not exist on all supported platforms. */
808                 int64_t t = av_gettime() + 100000;
809                 struct timespec tv = { .tv_sec  =  t / 1000000,
810                                        .tv_nsec = (t % 1000000) * 1000 };
811                 if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
812                     pthread_mutex_unlock(&s->mutex);
813                     return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
814                 }
815                 nonblock = 1;
816             }
817         } while( 1);
818     }
819 #endif
820
821     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
822         ret = ff_network_wait_fd(s->udp_fd, 0);
823         if (ret < 0)
824             return ret;
825     }
826     ret = recv(s->udp_fd, buf, size, 0);
827
828     return ret < 0 ? ff_neterrno() : ret;
829 }
830
831 static int udp_write(URLContext *h, const uint8_t *buf, int size)
832 {
833     UDPContext *s = h->priv_data;
834     int ret;
835
836     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
837         ret = ff_network_wait_fd(s->udp_fd, 1);
838         if (ret < 0)
839             return ret;
840     }
841
842     if (!s->is_connected) {
843         ret = sendto (s->udp_fd, buf, size, 0,
844                       (struct sockaddr *) &s->dest_addr,
845                       s->dest_addr_len);
846     } else
847         ret = send(s->udp_fd, buf, size, 0);
848
849     return ret < 0 ? ff_neterrno() : ret;
850 }
851
852 static int udp_close(URLContext *h)
853 {
854     UDPContext *s = h->priv_data;
855     int ret;
856
857     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
858         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
859     closesocket(s->udp_fd);
860 #if HAVE_PTHREAD_CANCEL
861     if (s->thread_started) {
862         pthread_cancel(s->circular_buffer_thread);
863         ret = pthread_join(s->circular_buffer_thread, NULL);
864         if (ret != 0)
865             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
866         pthread_mutex_destroy(&s->mutex);
867         pthread_cond_destroy(&s->cond);
868     }
869 #endif
870     av_fifo_free(s->fifo);
871     return 0;
872 }
873
874 URLProtocol ff_udp_protocol = {
875     .name                = "udp",
876     .url_open            = udp_open,
877     .url_read            = udp_read,
878     .url_write           = udp_write,
879     .url_close           = udp_close,
880     .url_get_file_handle = udp_get_file_handle,
881     .priv_data_size      = sizeof(UDPContext),
882     .priv_data_class     = &udp_context_class,
883     .flags               = URL_PROTOCOL_FLAG_NETWORK,
884 };