]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/udp.c
Merge remote-tracking branch 'qatar/master'
[ffmpeg] / libavformat / udp.c
index 151e070d191e63aa98de37d9f099e326ce735423..5bf7d82dad39d2774cbcab4736898753200063a3 100644 (file)
@@ -31,6 +31,7 @@
 #include "libavutil/parseutils.h"
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
+#include "libavutil/avstring.h"
 #include <unistd.h>
 #include "internal.h"
 #include "network.h"
@@ -68,6 +69,8 @@ typedef struct {
     int circular_buffer_error;
 #if HAVE_PTHREADS
     pthread_t circular_buffer_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
 #endif
     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
     int remaining_in_dg;
@@ -195,8 +198,8 @@ static int udp_set_url(struct sockaddr_storage *addr,
     return addr_len;
 }
 
-static int udp_socket_create(UDPContext *s,
-                             struct sockaddr_storage *addr, int *addr_len)
+static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
+                             int *addr_len, const char *localaddr)
 {
     int udp_fd = -1;
     struct addrinfo *res0 = NULL, *res = NULL;
@@ -204,7 +207,8 @@ static int udp_socket_create(UDPContext *s,
 
     if (((struct sockaddr *) &s->dest_addr)->sa_family)
         family = ((struct sockaddr *) &s->dest_addr)->sa_family;
-    res0 = udp_resolve_host(0, s->local_port, SOCK_DGRAM, family, AI_PASSIVE);
+    res0 = udp_resolve_host(localaddr[0] ? localaddr : NULL, s->local_port,
+                            SOCK_DGRAM, family, AI_PASSIVE);
     if (res0 == 0)
         goto fail;
     for (res = res0; res; res=res->ai_next) {
@@ -315,6 +319,7 @@ static int udp_get_file_handle(URLContext *h)
     return s->udp_fd;
 }
 
+#if HAVE_PTHREADS
 static void *circular_buffer_task( void *_URLContext)
 {
     URLContext *h = _URLContext;
@@ -327,9 +332,9 @@ static void *circular_buffer_task( void *_URLContext)
         int ret;
         int len;
 
-        if (url_interrupt_cb()) {
+        if (ff_check_interrupt(&h->interrupt_callback)) {
             s->circular_buffer_error = EINTR;
-            return NULL;
+            goto end;
         }
 
         FD_ZERO(&rfds);
@@ -341,7 +346,7 @@ static void *circular_buffer_task( void *_URLContext)
             if (ff_neterrno() == AVERROR(EINTR))
                 continue;
             s->circular_buffer_error = EIO;
-            return NULL;
+            goto end;
         }
 
         if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
@@ -355,31 +360,39 @@ static void *circular_buffer_task( void *_URLContext)
         if(left < UDP_MAX_PKT_SIZE + 4) {
             av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
             s->circular_buffer_error = EIO;
-            return NULL;
+            goto end;
         }
         left = FFMIN(left, s->fifo->end - s->fifo->wptr);
         len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
         if (len < 0) {
             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
                 s->circular_buffer_error = EIO;
-                return NULL;
+                goto end;
             }
             continue;
         }
         AV_WL32(s->tmp, len);
+        pthread_mutex_lock(&s->mutex);
         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
     }
 
+end:
+    pthread_mutex_lock(&s->mutex);
+    pthread_cond_signal(&s->cond);
+    pthread_mutex_unlock(&s->mutex);
     return NULL;
 }
+#endif
 
 /* put it in UDP context */
 /* return non zero if error */
 static int udp_open(URLContext *h, const char *uri, int flags)
 {
-    char hostname[1024];
+    char hostname[1024], localaddr[1024] = "";
     int port, udp_fd = -1, tmp, bind_ret = -1;
-    UDPContext *s = NULL;
+    UDPContext *s = h->priv_data;
     int is_output;
     const char *p;
     char buf[256];
@@ -392,11 +405,6 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
     is_output = !(flags & AVIO_FLAG_READ);
 
-    s = av_mallocz(sizeof(UDPContext));
-    if (!s)
-        return AVERROR(ENOMEM);
-
-    h->priv_data = s;
     s->ttl = 16;
     s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
 
@@ -430,6 +438,9 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
             s->circular_buffer_size = strtol(buf, NULL, 10)*188;
         }
+        if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
+            av_strlcpy(localaddr, buf, sizeof(localaddr));
+        }
     }
 
     /* fill the dest addr */
@@ -447,12 +458,12 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
     if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
         s->local_port = port;
-    udp_fd = udp_socket_create(s, &my_addr, &len);
+    udp_fd = udp_socket_create(s, &my_addr, &len, localaddr);
     if (udp_fd < 0)
         goto fail;
 
     /* Follow the requested reuse option, unless it's multicast in which
-     * case enable reuse unless explicitely disabled.
+     * case enable reuse unless explicitly disabled.
      */
     if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
         s->reuse_socket = 1;
@@ -516,6 +527,8 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     if (!is_output && s->circular_buffer_size) {
         /* start the task going */
         s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        pthread_mutex_init(&s->mutex, NULL);
+        pthread_cond_init(&s->cond, NULL);
         if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
             av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
             goto fail;
@@ -528,7 +541,6 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     if (udp_fd >= 0)
         closesocket(udp_fd);
     av_fifo_free(s->fifo);
-    av_free(s);
     return AVERROR(EIO);
 }
 
@@ -537,15 +549,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
     int avail;
-    fd_set rfds;
-    struct timeval tv;
 
+#if HAVE_PTHREADS
     if (s->fifo) {
-
+        pthread_mutex_lock(&s->mutex);
         do {
             avail = av_fifo_size(s->fifo);
             if (avail) { // >=size) {
                 uint8_t tmp[4];
+                pthread_mutex_unlock(&s->mutex);
 
                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
                 avail= AV_RL32(tmp);
@@ -555,19 +567,21 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
                 }
 
                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
+                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
                 return avail;
+            } else if(s->circular_buffer_error){
+                pthread_mutex_unlock(&s->mutex);
+                return s->circular_buffer_error;
+            } else if(h->flags & AVIO_FLAG_NONBLOCK) {
+                pthread_mutex_unlock(&s->mutex);
+                return AVERROR(EAGAIN);
             }
             else {
-                FD_ZERO(&rfds);
-                FD_SET(s->udp_fd, &rfds);
-                tv.tv_sec = 1;
-                tv.tv_usec = 0;
-                ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
-                if (ret<0)
-                    return ret;
+                pthread_cond_wait(&s->cond, &s->mutex);
             }
         } while( 1);
     }
+#endif
 
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 0);
@@ -608,7 +622,10 @@ static int udp_close(URLContext *h)
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
     closesocket(s->udp_fd);
     av_fifo_free(s->fifo);
-    av_free(s);
+#if HAVE_PTHREADS
+    pthread_mutex_destroy(&s->mutex);
+    pthread_cond_destroy(&s->cond);
+#endif
     return 0;
 }
 
@@ -619,4 +636,5 @@ URLProtocol ff_udp_protocol = {
     .url_write           = udp_write,
     .url_close           = udp_close,
     .url_get_file_handle = udp_get_file_handle,
+    .priv_data_size      = sizeof(UDPContext),
 };