]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/udp.c
Merge remote-tracking branch 'qatar/master'
[ffmpeg] / libavformat / udp.c
index ee5b3047699dff1f061741aa991e543dc21dc8f3..9694ad2f57ed0ffd44aed50706003412a7390762 100644 (file)
@@ -69,6 +69,10 @@ typedef struct {
     int circular_buffer_error;
 #if HAVE_PTHREADS
     pthread_t circular_buffer_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    int thread_started;
+    volatile int exit_thread;
 #endif
     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
     int remaining_in_dg;
@@ -317,6 +321,7 @@ static int udp_get_file_handle(URLContext *h)
     return s->udp_fd;
 }
 
+#if HAVE_PTHREADS
 static void *circular_buffer_task( void *_URLContext)
 {
     URLContext *h = _URLContext;
@@ -324,14 +329,14 @@ static void *circular_buffer_task( void *_URLContext)
     fd_set rfds;
     struct timeval tv;
 
-    for(;;) {
+    while(!s->exit_thread) {
         int left;
         int ret;
         int len;
 
         if (ff_check_interrupt(&h->interrupt_callback)) {
             s->circular_buffer_error = EINTR;
-            return NULL;
+            goto end;
         }
 
         FD_ZERO(&rfds);
@@ -343,7 +348,7 @@ static void *circular_buffer_task( void *_URLContext)
             if (ff_neterrno() == AVERROR(EINTR))
                 continue;
             s->circular_buffer_error = EIO;
-            return NULL;
+            goto end;
         }
 
         if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
@@ -357,23 +362,31 @@ static void *circular_buffer_task( void *_URLContext)
         if(left < UDP_MAX_PKT_SIZE + 4) {
             av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
             s->circular_buffer_error = EIO;
-            return NULL;
+            goto end;
         }
         left = FFMIN(left, s->fifo->end - s->fifo->wptr);
         len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
         if (len < 0) {
             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
                 s->circular_buffer_error = EIO;
-                return NULL;
+                goto end;
             }
             continue;
         }
         AV_WL32(s->tmp, len);
+        pthread_mutex_lock(&s->mutex);
         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
     }
 
+end:
+    pthread_mutex_lock(&s->mutex);
+    pthread_cond_signal(&s->cond);
+    pthread_mutex_unlock(&s->mutex);
     return NULL;
 }
+#endif
 
 /* put it in UDP context */
 /* return non zero if error */
@@ -381,7 +394,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 {
     char hostname[1024], localaddr[1024] = "";
     int port, udp_fd = -1, tmp, bind_ret = -1;
-    UDPContext *s = NULL;
+    UDPContext *s = h->priv_data;
     int is_output;
     const char *p;
     char buf[256];
@@ -394,11 +407,6 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
     is_output = !(flags & AVIO_FLAG_READ);
 
-    s = av_mallocz(sizeof(UDPContext));
-    if (!s)
-        return AVERROR(ENOMEM);
-
-    h->priv_data = s;
     s->ttl = 16;
     s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
 
@@ -457,7 +465,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         goto fail;
 
     /* Follow the requested reuse option, unless it's multicast in which
-     * case enable reuse unless explicitely disabled.
+     * case enable reuse unless explicitly disabled.
      */
     if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
         s->reuse_socket = 1;
@@ -519,21 +527,40 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
 #if HAVE_PTHREADS
     if (!is_output && s->circular_buffer_size) {
+        int ret;
+
         /* start the task going */
         s->fifo = av_fifo_alloc(s->circular_buffer_size);
-        if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
-            av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
+        ret = pthread_mutex_init(&s->mutex, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
             goto fail;
         }
+        ret = pthread_cond_init(&s->cond, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+            goto cond_fail;
+        }
+        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+            goto thread_fail;
+        }
+        s->thread_started = 1;
     }
 #endif
 
     return 0;
+#if HAVE_PTHREADS
+ thread_fail:
+    pthread_cond_destroy(&s->cond);
+ cond_fail:
+    pthread_mutex_destroy(&s->mutex);
+#endif
  fail:
     if (udp_fd >= 0)
         closesocket(udp_fd);
     av_fifo_free(s->fifo);
-    av_free(s);
     return AVERROR(EIO);
 }
 
@@ -542,15 +569,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
     int avail;
-    fd_set rfds;
-    struct timeval tv;
 
+#if HAVE_PTHREADS
     if (s->fifo) {
-
+        pthread_mutex_lock(&s->mutex);
         do {
             avail = av_fifo_size(s->fifo);
             if (avail) { // >=size) {
                 uint8_t tmp[4];
+                pthread_mutex_unlock(&s->mutex);
 
                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
                 avail= AV_RL32(tmp);
@@ -562,18 +589,19 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
                 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
                 return avail;
+            } else if(s->circular_buffer_error){
+                pthread_mutex_unlock(&s->mutex);
+                return s->circular_buffer_error;
+            } else if(h->flags & AVIO_FLAG_NONBLOCK) {
+                pthread_mutex_unlock(&s->mutex);
+                return AVERROR(EAGAIN);
             }
             else {
-                FD_ZERO(&rfds);
-                FD_SET(s->udp_fd, &rfds);
-                tv.tv_sec = 1;
-                tv.tv_usec = 0;
-                ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
-                if (ret<0)
-                    return ret;
+                pthread_cond_wait(&s->cond, &s->mutex);
             }
         } while( 1);
     }
+#endif
 
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 0);
@@ -609,12 +637,23 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
 static int udp_close(URLContext *h)
 {
     UDPContext *s = h->priv_data;
+    int ret;
 
     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
     closesocket(s->udp_fd);
     av_fifo_free(s->fifo);
-    av_free(s);
+#if HAVE_PTHREADS
+    if (s->thread_started) {
+        s->exit_thread = 1;
+        ret = pthread_join(s->circular_buffer_thread, NULL);
+        if (ret != 0)
+            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
+    }
+
+    pthread_mutex_destroy(&s->mutex);
+    pthread_cond_destroy(&s->cond);
+#endif
     return 0;
 }
 
@@ -625,4 +664,6 @@ URLProtocol ff_udp_protocol = {
     .url_write           = udp_write,
     .url_close           = udp_close,
     .url_get_file_handle = udp_get_file_handle,
+    .priv_data_size      = sizeof(UDPContext),
+    .flags               = URL_PROTOCOL_FLAG_NETWORK,
 };