]> git.sesse.net Git - ffmpeg/blob - libavformat/udp.c
Merge commit '642fd4769becc2f4827f8375a3d9e8edd2f5df77'
[ffmpeg] / libavformat / udp.c
1 /*
2  * UDP prototype streaming system
3  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21
22 /**
23  * @file
24  * UDP protocol
25  */
26
27 #define _DEFAULT_SOURCE
28 #define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
29
30 #include "avformat.h"
31 #include "avio_internal.h"
32 #include "libavutil/avassert.h"
33 #include "libavutil/parseutils.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/intreadwrite.h"
36 #include "libavutil/avstring.h"
37 #include "libavutil/opt.h"
38 #include "libavutil/log.h"
39 #include "libavutil/time.h"
40 #include "internal.h"
41 #include "network.h"
42 #include "os_support.h"
43 #include "url.h"
44
45 #ifdef __APPLE__
46 #include "TargetConditionals.h"
47 #endif
48
49 #if HAVE_UDPLITE_H
50 #include "udplite.h"
51 #else
52 /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
53  * So, we provide a fallback here.
54  */
55 #define UDPLITE_SEND_CSCOV                               10
56 #define UDPLITE_RECV_CSCOV                               11
57 #endif
58
59 #ifndef IPPROTO_UDPLITE
60 #define IPPROTO_UDPLITE                                  136
61 #endif
62
63 #if HAVE_PTHREAD_CANCEL
64 #include <pthread.h>
65 #endif
66
67 #ifndef IPV6_ADD_MEMBERSHIP
68 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
69 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
70 #endif
71
72 #define UDP_TX_BUF_SIZE 32768
73 #define UDP_MAX_PKT_SIZE 65536
74 #define UDP_HEADER_SIZE 8
75
76 typedef struct UDPContext {
77     const AVClass *class;
78     int udp_fd;
79     int ttl;
80     int udplite_coverage;
81     int buffer_size;
82     int pkt_size;
83     int is_multicast;
84     int is_broadcast;
85     int local_port;
86     int reuse_socket;
87     int overrun_nonfatal;
88     struct sockaddr_storage dest_addr;
89     int dest_addr_len;
90     int is_connected;
91
92     /* Circular Buffer variables for use in UDP receive code */
93     int circular_buffer_size;
94     AVFifoBuffer *fifo;
95     int circular_buffer_error;
96     int64_t bitrate; /* number of bits to send per second */
97     int64_t burst_bits;
98     int close_req;
99 #if HAVE_PTHREAD_CANCEL
100     pthread_t circular_buffer_thread;
101     pthread_mutex_t mutex;
102     pthread_cond_t cond;
103     int thread_started;
104 #endif
105     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
106     int remaining_in_dg;
107     char *localaddr;
108     int timeout;
109     struct sockaddr_storage local_addr_storage;
110     char *sources;
111     char *block;
112 } UDPContext;
113
114 #define OFFSET(x) offsetof(UDPContext, x)
115 #define D AV_OPT_FLAG_DECODING_PARAM
116 #define E AV_OPT_FLAG_ENCODING_PARAM
117 static const AVOption options[] = {
118     { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
119     { "bitrate",        "Bits to send per second",                         OFFSET(bitrate),        AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
120     { "burst_bits",     "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits),   AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
121     { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
122     { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
123     { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
124     { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
125     { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
126     { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       D|E },
127     { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = D|E },
128     { "broadcast", "explicitly allow or disallow broadcast destination",   OFFSET(is_broadcast),   AV_OPT_TYPE_BOOL,   { .i64 = 0  },     0, 1,       E },
129     { "ttl",            "Time to live (multicast only)",                   OFFSET(ttl),            AV_OPT_TYPE_INT,    { .i64 = 16 },     0, INT_MAX, E },
130     { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_BOOL,   { .i64 =  0 },     0, 1,       .flags = D|E },
131     { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
132     { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
133     { "timeout",        "set raise error timeout (only in read mode)",     OFFSET(timeout),        AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, D },
134     { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
135     { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
136     { NULL }
137 };
138
139 static const AVClass udp_class = {
140     .class_name = "udp",
141     .item_name  = av_default_item_name,
142     .option     = options,
143     .version    = LIBAVUTIL_VERSION_INT,
144 };
145
146 static const AVClass udplite_context_class = {
147     .class_name     = "udplite",
148     .item_name      = av_default_item_name,
149     .option         = options,
150     .version        = LIBAVUTIL_VERSION_INT,
151 };
152
153 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
154                                  struct sockaddr *addr)
155 {
156 #ifdef IP_MULTICAST_TTL
157     if (addr->sa_family == AF_INET) {
158         if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
159             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
160             return -1;
161         }
162     }
163 #endif
164 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
165     if (addr->sa_family == AF_INET6) {
166         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
167             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
168             return -1;
169         }
170     }
171 #endif
172     return 0;
173 }
174
175 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
176 {
177 #ifdef IP_ADD_MEMBERSHIP
178     if (addr->sa_family == AF_INET) {
179         struct ip_mreq mreq;
180
181         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
182         if (local_addr)
183             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
184         else
185             mreq.imr_interface.s_addr= INADDR_ANY;
186         if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
187             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
188             return -1;
189         }
190     }
191 #endif
192 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
193     if (addr->sa_family == AF_INET6) {
194         struct ipv6_mreq mreq6;
195
196         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
197         mreq6.ipv6mr_interface= 0;
198         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
199             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
200             return -1;
201         }
202     }
203 #endif
204     return 0;
205 }
206
207 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
208 {
209 #ifdef IP_DROP_MEMBERSHIP
210     if (addr->sa_family == AF_INET) {
211         struct ip_mreq mreq;
212
213         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
214         if (local_addr)
215             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
216         else
217             mreq.imr_interface.s_addr= INADDR_ANY;
218         if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
219             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
220             return -1;
221         }
222     }
223 #endif
224 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
225     if (addr->sa_family == AF_INET6) {
226         struct ipv6_mreq mreq6;
227
228         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
229         mreq6.ipv6mr_interface= 0;
230         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
231             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
232             return -1;
233         }
234     }
235 #endif
236     return 0;
237 }
238
239 static struct addrinfo *udp_resolve_host(URLContext *h,
240                                          const char *hostname, int port,
241                                          int type, int family, int flags)
242 {
243     struct addrinfo hints = { 0 }, *res = 0;
244     int error;
245     char sport[16];
246     const char *node = 0, *service = "0";
247
248     if (port > 0) {
249         snprintf(sport, sizeof(sport), "%d", port);
250         service = sport;
251     }
252     if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
253         node = hostname;
254     }
255     hints.ai_socktype = type;
256     hints.ai_family   = family;
257     hints.ai_flags = flags;
258     if ((error = getaddrinfo(node, service, &hints, &res))) {
259         res = NULL;
260         av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n",
261                node ? node : "unknown",
262                service,
263                gai_strerror(error));
264     }
265
266     return res;
267 }
268
269 static int udp_set_multicast_sources(URLContext *h,
270                                      int sockfd, struct sockaddr *addr,
271                                      int addr_len, char **sources,
272                                      int nb_sources, int include)
273 {
274 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32) && (!defined(TARGET_OS_TV) || !TARGET_OS_TV)
275     /* These ones are available in the microsoft SDK, but don't seem to work
276      * as on linux, so just prefer the v4-only approach there for now. */
277     int i;
278     for (i = 0; i < nb_sources; i++) {
279         struct group_source_req mreqs;
280         int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
281         struct addrinfo *sourceaddr = udp_resolve_host(h, sources[i], 0,
282                                                        SOCK_DGRAM, AF_UNSPEC,
283                                                        0);
284         if (!sourceaddr)
285             return AVERROR(ENOENT);
286
287         mreqs.gsr_interface = 0;
288         memcpy(&mreqs.gsr_group, addr, addr_len);
289         memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
290         freeaddrinfo(sourceaddr);
291
292         if (setsockopt(sockfd, level,
293                        include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
294                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
295             if (include)
296                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
297             else
298                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
299             return ff_neterrno();
300         }
301     }
302 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
303     int i;
304     if (addr->sa_family != AF_INET) {
305         av_log(NULL, AV_LOG_ERROR,
306                "Setting multicast sources only supported for IPv4\n");
307         return AVERROR(EINVAL);
308     }
309     for (i = 0; i < nb_sources; i++) {
310         struct ip_mreq_source mreqs;
311         struct addrinfo *sourceaddr = udp_resolve_host(h, sources[i], 0,
312                                                        SOCK_DGRAM, AF_UNSPEC,
313                                                        0);
314         if (!sourceaddr)
315             return AVERROR(ENOENT);
316         if (sourceaddr->ai_addr->sa_family != AF_INET) {
317             freeaddrinfo(sourceaddr);
318             av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
319                    sources[i]);
320             return AVERROR(EINVAL);
321         }
322
323         mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
324         mreqs.imr_interface.s_addr = INADDR_ANY;
325         mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
326         freeaddrinfo(sourceaddr);
327
328         if (setsockopt(sockfd, IPPROTO_IP,
329                        include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
330                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
331             if (include)
332                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
333             else
334                 ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
335             return ff_neterrno();
336         }
337     }
338 #else
339     return AVERROR(ENOSYS);
340 #endif
341     return 0;
342 }
343 static int udp_set_url(URLContext *h,
344                        struct sockaddr_storage *addr,
345                        const char *hostname, int port)
346 {
347     struct addrinfo *res0;
348     int addr_len;
349
350     res0 = udp_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
351     if (!res0) return AVERROR(EIO);
352     memcpy(addr, res0->ai_addr, res0->ai_addrlen);
353     addr_len = res0->ai_addrlen;
354     freeaddrinfo(res0);
355
356     return addr_len;
357 }
358
359 static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr,
360                              socklen_t *addr_len, const char *localaddr)
361 {
362     UDPContext *s = h->priv_data;
363     int udp_fd = -1;
364     struct addrinfo *res0, *res;
365     int family = AF_UNSPEC;
366
367     if (((struct sockaddr *) &s->dest_addr)->sa_family)
368         family = ((struct sockaddr *) &s->dest_addr)->sa_family;
369     res0 = udp_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
370                             s->local_port,
371                             SOCK_DGRAM, family, AI_PASSIVE);
372     if (!res0)
373         goto fail;
374     for (res = res0; res; res=res->ai_next) {
375         if (s->udplite_coverage)
376             udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE);
377         else
378             udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0);
379         if (udp_fd != -1) break;
380         ff_log_net_error(NULL, AV_LOG_ERROR, "socket");
381     }
382
383     if (udp_fd < 0)
384         goto fail;
385
386     memcpy(addr, res->ai_addr, res->ai_addrlen);
387     *addr_len = res->ai_addrlen;
388
389     freeaddrinfo(res0);
390
391     return udp_fd;
392
393  fail:
394     if (udp_fd >= 0)
395         closesocket(udp_fd);
396     if(res0)
397         freeaddrinfo(res0);
398     return -1;
399 }
400
401 static int udp_port(struct sockaddr_storage *addr, int addr_len)
402 {
403     char sbuf[sizeof(int)*3+1];
404     int error;
405
406     if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
407         av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
408         return -1;
409     }
410
411     return strtol(sbuf, NULL, 10);
412 }
413
414
415 /**
416  * If no filename is given to av_open_input_file because you want to
417  * get the local port first, then you must call this function to set
418  * the remote server address.
419  *
420  * url syntax: udp://host:port[?option=val...]
421  * option: 'ttl=n'       : set the ttl value (for multicast only)
422  *         'localport=n' : set the local port
423  *         'pkt_size=n'  : set max packet size
424  *         'reuse=1'     : enable reusing the socket
425  *         'overrun_nonfatal=1': survive in case of circular buffer overrun
426  *
427  * @param h media file context
428  * @param uri of the remote server
429  * @return zero if no error.
430  */
431 int ff_udp_set_remote_url(URLContext *h, const char *uri)
432 {
433     UDPContext *s = h->priv_data;
434     char hostname[256], buf[10];
435     int port;
436     const char *p;
437
438     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
439
440     /* set the destination address */
441     s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port);
442     if (s->dest_addr_len < 0) {
443         return AVERROR(EIO);
444     }
445     s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
446     p = strchr(uri, '?');
447     if (p) {
448         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
449             int was_connected = s->is_connected;
450             s->is_connected = strtol(buf, NULL, 10);
451             if (s->is_connected && !was_connected) {
452                 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
453                             s->dest_addr_len)) {
454                     s->is_connected = 0;
455                     ff_log_net_error(h, AV_LOG_ERROR, "connect");
456                     return AVERROR(EIO);
457                 }
458             }
459         }
460     }
461
462     return 0;
463 }
464
465 /**
466  * Return the local port used by the UDP connection
467  * @param h media file context
468  * @return the local port number
469  */
470 int ff_udp_get_local_port(URLContext *h)
471 {
472     UDPContext *s = h->priv_data;
473     return s->local_port;
474 }
475
476 /**
477  * Return the udp file handle for select() usage to wait for several RTP
478  * streams at the same time.
479  * @param h media file context
480  */
481 static int udp_get_file_handle(URLContext *h)
482 {
483     UDPContext *s = h->priv_data;
484     return s->udp_fd;
485 }
486
487 #if HAVE_PTHREAD_CANCEL
488 static void *circular_buffer_task_rx( void *_URLContext)
489 {
490     URLContext *h = _URLContext;
491     UDPContext *s = h->priv_data;
492     int old_cancelstate;
493
494     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
495     pthread_mutex_lock(&s->mutex);
496     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
497         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
498         s->circular_buffer_error = AVERROR(EIO);
499         goto end;
500     }
501     while(1) {
502         int len;
503
504         pthread_mutex_unlock(&s->mutex);
505         /* Blocking operations are always cancellation points;
506            see "General Information" / "Thread Cancelation Overview"
507            in Single Unix. */
508         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
509         len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
510         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
511         pthread_mutex_lock(&s->mutex);
512         if (len < 0) {
513             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
514                 s->circular_buffer_error = ff_neterrno();
515                 goto end;
516             }
517             continue;
518         }
519         AV_WL32(s->tmp, len);
520
521         if(av_fifo_space(s->fifo) < len + 4) {
522             /* No Space left */
523             if (s->overrun_nonfatal) {
524                 av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
525                         "Surviving due to overrun_nonfatal option\n");
526                 continue;
527             } else {
528                 av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
529                         "To avoid, increase fifo_size URL option. "
530                         "To survive in such case, use overrun_nonfatal option\n");
531                 s->circular_buffer_error = AVERROR(EIO);
532                 goto end;
533             }
534         }
535         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
536         pthread_cond_signal(&s->cond);
537     }
538
539 end:
540     pthread_cond_signal(&s->cond);
541     pthread_mutex_unlock(&s->mutex);
542     return NULL;
543 }
544
545 static void *circular_buffer_task_tx( void *_URLContext)
546 {
547     URLContext *h = _URLContext;
548     UDPContext *s = h->priv_data;
549     int old_cancelstate;
550     int64_t target_timestamp = av_gettime_relative();
551     int64_t start_timestamp = av_gettime_relative();
552     int64_t sent_bits = 0;
553     int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
554     int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
555
556     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
557     pthread_mutex_lock(&s->mutex);
558
559     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
560         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
561         s->circular_buffer_error = AVERROR(EIO);
562         goto end;
563     }
564
565     for(;;) {
566         int len;
567         const uint8_t *p;
568         uint8_t tmp[4];
569         int64_t timestamp;
570
571         len=av_fifo_size(s->fifo);
572
573         while (len<4) {
574             if (s->close_req)
575                 goto end;
576             if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
577                 goto end;
578             }
579             len=av_fifo_size(s->fifo);
580         }
581
582         av_fifo_generic_read(s->fifo, tmp, 4, NULL);
583         len=AV_RL32(tmp);
584
585         av_assert0(len >= 0);
586         av_assert0(len <= sizeof(s->tmp));
587
588         av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
589
590         pthread_mutex_unlock(&s->mutex);
591         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
592
593         if (s->bitrate) {
594             timestamp = av_gettime_relative();
595             if (timestamp < target_timestamp) {
596                 int64_t delay = target_timestamp - timestamp;
597                 if (delay > max_delay) {
598                     delay = max_delay;
599                     start_timestamp = timestamp + delay;
600                     sent_bits = 0;
601                 }
602                 av_usleep(delay);
603             } else {
604                 if (timestamp - burst_interval > target_timestamp) {
605                     start_timestamp = timestamp - burst_interval;
606                     sent_bits = 0;
607                 }
608             }
609             sent_bits += len * 8;
610             target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
611         }
612
613         p = s->tmp;
614         while (len) {
615             int ret;
616             av_assert0(len > 0);
617             if (!s->is_connected) {
618                 ret = sendto (s->udp_fd, p, len, 0,
619                             (struct sockaddr *) &s->dest_addr,
620                             s->dest_addr_len);
621             } else
622                 ret = send(s->udp_fd, p, len, 0);
623             if (ret >= 0) {
624                 len -= ret;
625                 p   += ret;
626             } else {
627                 ret = ff_neterrno();
628                 if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
629                     pthread_mutex_lock(&s->mutex);
630                     s->circular_buffer_error = ret;
631                     pthread_mutex_unlock(&s->mutex);
632                     return NULL;
633                 }
634             }
635         }
636
637         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
638         pthread_mutex_lock(&s->mutex);
639     }
640
641 end:
642     pthread_mutex_unlock(&s->mutex);
643     return NULL;
644 }
645
646
647 #endif
648
649 static int parse_source_list(char *buf, char **sources, int *num_sources,
650                              int max_sources)
651 {
652     char *source_start;
653
654     source_start = buf;
655     while (1) {
656         char *next = strchr(source_start, ',');
657         if (next)
658             *next = '\0';
659         sources[*num_sources] = av_strdup(source_start);
660         if (!sources[*num_sources])
661             return AVERROR(ENOMEM);
662         source_start = next + 1;
663         (*num_sources)++;
664         if (*num_sources >= max_sources || !next)
665             break;
666     }
667     return 0;
668 }
669
670 /* put it in UDP context */
671 /* return non zero if error */
672 static int udp_open(URLContext *h, const char *uri, int flags)
673 {
674     char hostname[1024], localaddr[1024] = "";
675     int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
676     UDPContext *s = h->priv_data;
677     int is_output;
678     const char *p;
679     char buf[256];
680     struct sockaddr_storage my_addr;
681     socklen_t len;
682     int i, num_include_sources = 0, num_exclude_sources = 0;
683     char *include_sources[32], *exclude_sources[32];
684
685     h->is_streamed = 1;
686
687     is_output = !(flags & AVIO_FLAG_READ);
688     if (s->buffer_size < 0)
689         s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
690
691     if (s->sources) {
692         if (parse_source_list(s->sources, include_sources,
693                               &num_include_sources,
694                               FF_ARRAY_ELEMS(include_sources)))
695             goto fail;
696     }
697
698     if (s->block) {
699         if (parse_source_list(s->block, exclude_sources, &num_exclude_sources,
700                               FF_ARRAY_ELEMS(exclude_sources)))
701             goto fail;
702     }
703
704     if (s->pkt_size > 0)
705         h->max_packet_size = s->pkt_size;
706
707     p = strchr(uri, '?');
708     if (p) {
709         if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
710             char *endptr = NULL;
711             s->reuse_socket = strtol(buf, &endptr, 10);
712             /* assume if no digits were found it is a request to enable it */
713             if (buf == endptr)
714                 s->reuse_socket = 1;
715         }
716         if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
717             char *endptr = NULL;
718             s->overrun_nonfatal = strtol(buf, &endptr, 10);
719             /* assume if no digits were found it is a request to enable it */
720             if (buf == endptr)
721                 s->overrun_nonfatal = 1;
722             if (!HAVE_PTHREAD_CANCEL)
723                 av_log(h, AV_LOG_WARNING,
724                        "'overrun_nonfatal' option was set but it is not supported "
725                        "on this build (pthread support is required)\n");
726         }
727         if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
728             s->ttl = strtol(buf, NULL, 10);
729         }
730         if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
731             s->udplite_coverage = strtol(buf, NULL, 10);
732         }
733         if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
734             s->local_port = strtol(buf, NULL, 10);
735         }
736         if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
737             s->pkt_size = strtol(buf, NULL, 10);
738         }
739         if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
740             s->buffer_size = strtol(buf, NULL, 10);
741         }
742         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
743             s->is_connected = strtol(buf, NULL, 10);
744         }
745         if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
746             dscp = strtol(buf, NULL, 10);
747         }
748         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
749             s->circular_buffer_size = strtol(buf, NULL, 10);
750             if (!HAVE_PTHREAD_CANCEL)
751                 av_log(h, AV_LOG_WARNING,
752                        "'circular_buffer_size' option was set but it is not supported "
753                        "on this build (pthread support is required)\n");
754         }
755         if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
756             s->bitrate = strtoll(buf, NULL, 10);
757             if (!HAVE_PTHREAD_CANCEL)
758                 av_log(h, AV_LOG_WARNING,
759                        "'bitrate' option was set but it is not supported "
760                        "on this build (pthread support is required)\n");
761         }
762         if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
763             s->burst_bits = strtoll(buf, NULL, 10);
764         }
765         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
766             av_strlcpy(localaddr, buf, sizeof(localaddr));
767         }
768         if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
769             if (parse_source_list(buf, include_sources, &num_include_sources,
770                                   FF_ARRAY_ELEMS(include_sources)))
771                 goto fail;
772         }
773         if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
774             if (parse_source_list(buf, exclude_sources, &num_exclude_sources,
775                                   FF_ARRAY_ELEMS(exclude_sources)))
776                 goto fail;
777         }
778         if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
779             s->timeout = strtol(buf, NULL, 10);
780         if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
781             s->is_broadcast = strtol(buf, NULL, 10);
782     }
783     /* handling needed to support options picking from both AVOption and URL */
784     s->circular_buffer_size *= 188;
785     if (flags & AVIO_FLAG_WRITE) {
786         h->max_packet_size = s->pkt_size;
787     } else {
788         h->max_packet_size = UDP_MAX_PKT_SIZE;
789     }
790     h->rw_timeout = s->timeout;
791
792     /* fill the dest addr */
793     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
794
795     /* XXX: fix av_url_split */
796     if (hostname[0] == '\0' || hostname[0] == '?') {
797         /* only accepts null hostname if input */
798         if (!(flags & AVIO_FLAG_READ))
799             goto fail;
800     } else {
801         if (ff_udp_set_remote_url(h, uri) < 0)
802             goto fail;
803     }
804
805     if ((s->is_multicast || s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
806         s->local_port = port;
807
808     if (localaddr[0])
809         udp_fd = udp_socket_create(h, &my_addr, &len, localaddr);
810     else
811         udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr);
812     if (udp_fd < 0)
813         goto fail;
814
815     s->local_addr_storage=my_addr; //store for future multicast join
816
817     /* Follow the requested reuse option, unless it's multicast in which
818      * case enable reuse unless explicitly disabled.
819      */
820     if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
821         s->reuse_socket = 1;
822         if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
823             goto fail;
824     }
825
826     if (s->is_broadcast) {
827 #ifdef SO_BROADCAST
828         if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0)
829 #endif
830            goto fail;
831     }
832
833     /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
834      * The receiver coverage has to be less than or equal to the sender coverage.
835      * Otherwise, the receiver will drop all packets.
836      */
837     if (s->udplite_coverage) {
838         if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
839             av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
840
841         if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
842             av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
843     }
844
845     if (dscp >= 0) {
846         dscp <<= 2;
847         if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0)
848             goto fail;
849     }
850
851     /* If multicast, try binding the multicast address first, to avoid
852      * receiving UDP packets from other sources aimed at the same UDP
853      * port. This fails on windows. This makes sending to the same address
854      * using sendto() fail, so only do it if we're opened in read-only mode. */
855     if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
856         bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
857     }
858     /* bind to the local address if not multicast or if the multicast
859      * bind failed */
860     /* the bind is needed to give a port to the socket now */
861     if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
862         ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
863         goto fail;
864     }
865
866     len = sizeof(my_addr);
867     getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
868     s->local_port = udp_port(&my_addr, len);
869
870     if (s->is_multicast) {
871         if (h->flags & AVIO_FLAG_WRITE) {
872             /* output */
873             if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
874                 goto fail;
875         }
876         if (h->flags & AVIO_FLAG_READ) {
877             /* input */
878             if (num_include_sources && num_exclude_sources) {
879                 av_log(h, AV_LOG_ERROR, "Simultaneously including and excluding multicast sources is not supported\n");
880                 goto fail;
881             }
882             if (num_include_sources) {
883                 if (udp_set_multicast_sources(h, udp_fd,
884                                               (struct sockaddr *)&s->dest_addr,
885                                               s->dest_addr_len,
886                                               include_sources,
887                                               num_include_sources, 1) < 0)
888                     goto fail;
889             } else {
890                 if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0)
891                     goto fail;
892             }
893             if (num_exclude_sources) {
894                 if (udp_set_multicast_sources(h, udp_fd,
895                                               (struct sockaddr *)&s->dest_addr,
896                                               s->dest_addr_len,
897                                               exclude_sources,
898                                               num_exclude_sources, 0) < 0)
899                     goto fail;
900             }
901         }
902     }
903
904     if (is_output) {
905         /* limit the tx buf size to limit latency */
906         tmp = s->buffer_size;
907         if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
908             ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
909             goto fail;
910         }
911     } else {
912         /* set udp recv buffer size to the requested value (default 64K) */
913         tmp = s->buffer_size;
914         if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
915             ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
916         }
917         len = sizeof(tmp);
918         if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
919             ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
920         } else {
921             av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
922             if(tmp < s->buffer_size)
923                 av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
924         }
925
926         /* make the socket non-blocking */
927         ff_socket_nonblock(udp_fd, 1);
928     }
929     if (s->is_connected) {
930         if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
931             ff_log_net_error(h, AV_LOG_ERROR, "connect");
932             goto fail;
933         }
934     }
935
936     for (i = 0; i < num_include_sources; i++)
937         av_freep(&include_sources[i]);
938     for (i = 0; i < num_exclude_sources; i++)
939         av_freep(&exclude_sources[i]);
940
941     s->udp_fd = udp_fd;
942
943 #if HAVE_PTHREAD_CANCEL
944     /*
945       Create thread in case of:
946       1. Input and circular_buffer_size is set
947       2. Output and bitrate and circular_buffer_size is set
948     */
949
950     if (is_output && s->bitrate && !s->circular_buffer_size) {
951         /* Warn user in case of 'circular_buffer_size' is not set */
952         av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
953     }
954
955     if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
956         int ret;
957
958         /* start the task going */
959         s->fifo = av_fifo_alloc(s->circular_buffer_size);
960         ret = pthread_mutex_init(&s->mutex, NULL);
961         if (ret != 0) {
962             av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
963             goto fail;
964         }
965         ret = pthread_cond_init(&s->cond, NULL);
966         if (ret != 0) {
967             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
968             goto cond_fail;
969         }
970         ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
971         if (ret != 0) {
972             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
973             goto thread_fail;
974         }
975         s->thread_started = 1;
976     }
977 #endif
978
979     return 0;
980 #if HAVE_PTHREAD_CANCEL
981  thread_fail:
982     pthread_cond_destroy(&s->cond);
983  cond_fail:
984     pthread_mutex_destroy(&s->mutex);
985 #endif
986  fail:
987     if (udp_fd >= 0)
988         closesocket(udp_fd);
989     av_fifo_freep(&s->fifo);
990     for (i = 0; i < num_include_sources; i++)
991         av_freep(&include_sources[i]);
992     for (i = 0; i < num_exclude_sources; i++)
993         av_freep(&exclude_sources[i]);
994     return AVERROR(EIO);
995 }
996
997 static int udplite_open(URLContext *h, const char *uri, int flags)
998 {
999     UDPContext *s = h->priv_data;
1000
1001     // set default checksum coverage
1002     s->udplite_coverage = UDP_HEADER_SIZE;
1003
1004     return udp_open(h, uri, flags);
1005 }
1006
1007 static int udp_read(URLContext *h, uint8_t *buf, int size)
1008 {
1009     UDPContext *s = h->priv_data;
1010     int ret;
1011 #if HAVE_PTHREAD_CANCEL
1012     int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
1013
1014     if (s->fifo) {
1015         pthread_mutex_lock(&s->mutex);
1016         do {
1017             avail = av_fifo_size(s->fifo);
1018             if (avail) { // >=size) {
1019                 uint8_t tmp[4];
1020
1021                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
1022                 avail= AV_RL32(tmp);
1023                 if(avail > size){
1024                     av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
1025                     avail= size;
1026                 }
1027
1028                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
1029                 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
1030                 pthread_mutex_unlock(&s->mutex);
1031                 return avail;
1032             } else if(s->circular_buffer_error){
1033                 int err = s->circular_buffer_error;
1034                 pthread_mutex_unlock(&s->mutex);
1035                 return err;
1036             } else if(nonblock) {
1037                 pthread_mutex_unlock(&s->mutex);
1038                 return AVERROR(EAGAIN);
1039             }
1040             else {
1041                 /* FIXME: using the monotonic clock would be better,
1042                    but it does not exist on all supported platforms. */
1043                 int64_t t = av_gettime() + 100000;
1044                 struct timespec tv = { .tv_sec  =  t / 1000000,
1045                                        .tv_nsec = (t % 1000000) * 1000 };
1046                 if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
1047                     pthread_mutex_unlock(&s->mutex);
1048                     return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
1049                 }
1050                 nonblock = 1;
1051             }
1052         } while( 1);
1053     }
1054 #endif
1055
1056     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1057         ret = ff_network_wait_fd(s->udp_fd, 0);
1058         if (ret < 0)
1059             return ret;
1060     }
1061     ret = recv(s->udp_fd, buf, size, 0);
1062
1063     return ret < 0 ? ff_neterrno() : ret;
1064 }
1065
1066 static int udp_write(URLContext *h, const uint8_t *buf, int size)
1067 {
1068     UDPContext *s = h->priv_data;
1069     int ret;
1070
1071 #if HAVE_PTHREAD_CANCEL
1072     if (s->fifo) {
1073         uint8_t tmp[4];
1074
1075         pthread_mutex_lock(&s->mutex);
1076
1077         /*
1078           Return error if last tx failed.
1079           Here we can't know on which packet error was, but it needs to know that error exists.
1080         */
1081         if (s->circular_buffer_error<0) {
1082             int err=s->circular_buffer_error;
1083             pthread_mutex_unlock(&s->mutex);
1084             return err;
1085         }
1086
1087         if(av_fifo_space(s->fifo) < size + 4) {
1088             /* What about a partial packet tx ? */
1089             pthread_mutex_unlock(&s->mutex);
1090             return AVERROR(ENOMEM);
1091         }
1092         AV_WL32(tmp, size);
1093         av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
1094         av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
1095         pthread_cond_signal(&s->cond);
1096         pthread_mutex_unlock(&s->mutex);
1097         return size;
1098     }
1099 #endif
1100     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1101         ret = ff_network_wait_fd(s->udp_fd, 1);
1102         if (ret < 0)
1103             return ret;
1104     }
1105
1106     if (!s->is_connected) {
1107         ret = sendto (s->udp_fd, buf, size, 0,
1108                       (struct sockaddr *) &s->dest_addr,
1109                       s->dest_addr_len);
1110     } else
1111         ret = send(s->udp_fd, buf, size, 0);
1112
1113     return ret < 0 ? ff_neterrno() : ret;
1114 }
1115
1116 static int udp_close(URLContext *h)
1117 {
1118     UDPContext *s = h->priv_data;
1119
1120 #if HAVE_PTHREAD_CANCEL
1121     // Request close once writing is finished
1122     if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
1123         pthread_mutex_lock(&s->mutex);
1124         s->close_req = 1;
1125         pthread_cond_signal(&s->cond);
1126         pthread_mutex_unlock(&s->mutex);
1127     }
1128 #endif
1129
1130     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
1131         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
1132 #if HAVE_PTHREAD_CANCEL
1133     if (s->thread_started) {
1134         int ret;
1135         // Cancel only read, as write has been signaled as success to the user
1136         if (h->flags & AVIO_FLAG_READ)
1137             pthread_cancel(s->circular_buffer_thread);
1138         ret = pthread_join(s->circular_buffer_thread, NULL);
1139         if (ret != 0)
1140             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
1141         pthread_mutex_destroy(&s->mutex);
1142         pthread_cond_destroy(&s->cond);
1143     }
1144 #endif
1145     closesocket(s->udp_fd);
1146     av_fifo_freep(&s->fifo);
1147     return 0;
1148 }
1149
1150 const URLProtocol ff_udp_protocol = {
1151     .name                = "udp",
1152     .url_open            = udp_open,
1153     .url_read            = udp_read,
1154     .url_write           = udp_write,
1155     .url_close           = udp_close,
1156     .url_get_file_handle = udp_get_file_handle,
1157     .priv_data_size      = sizeof(UDPContext),
1158     .priv_data_class     = &udp_class,
1159     .flags               = URL_PROTOCOL_FLAG_NETWORK,
1160 };
1161
1162 const URLProtocol ff_udplite_protocol = {
1163     .name                = "udplite",
1164     .url_open            = udplite_open,
1165     .url_read            = udp_read,
1166     .url_write           = udp_write,
1167     .url_close           = udp_close,
1168     .url_get_file_handle = udp_get_file_handle,
1169     .priv_data_size      = sizeof(UDPContext),
1170     .priv_data_class     = &udplite_context_class,
1171     .flags               = URL_PROTOCOL_FLAG_NETWORK,
1172 };