]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/udp.c
Merge remote-tracking branch 'qatar/master'
[ffmpeg] / libavformat / udp.c
index 8bb63c629808abe8e60e3b790af43d6844d845bf..9694ad2f57ed0ffd44aed50706003412a7390762 100644 (file)
@@ -2,20 +2,20 @@
  * UDP prototype streaming system
  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
  *
- * This file is part of Libav.
+ * This file is part of FFmpeg.
  *
- * Libav is free software; you can redistribute it and/or
+ * FFmpeg is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
  * License as published by the Free Software Foundation; either
  * version 2.1 of the License, or (at your option) any later version.
  *
- * Libav is distributed in the hope that it will be useful,
+ * FFmpeg is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
- * License along with Libav; if not, write to the Free Software
+ * License along with FFmpeg; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
 #include "avformat.h"
 #include "avio_internal.h"
 #include "libavutil/parseutils.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
 #include "libavutil/avstring.h"
 #include <unistd.h>
 #include "internal.h"
 #include "network.h"
 #include "os_support.h"
 #include "url.h"
+
+#if HAVE_PTHREADS
+#include <pthread.h>
+#endif
+
 #include <sys/time.h>
 
 #ifndef IPV6_ADD_MEMBERSHIP
@@ -42,6 +49,9 @@
 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
 #endif
 
+#define UDP_TX_BUF_SIZE 32768
+#define UDP_MAX_PKT_SIZE 65536
+
 typedef struct {
     int udp_fd;
     int ttl;
@@ -52,10 +62,21 @@ typedef struct {
     struct sockaddr_storage dest_addr;
     int dest_addr_len;
     int is_connected;
-} UDPContext;
 
-#define UDP_TX_BUF_SIZE 32768
-#define UDP_MAX_PKT_SIZE 65536
+    /* Circular Buffer variables for use in UDP receive code */
+    int circular_buffer_size;
+    AVFifoBuffer *fifo;
+    int circular_buffer_error;
+#if HAVE_PTHREADS
+    pthread_t circular_buffer_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    int thread_started;
+    volatile int exit_thread;
+#endif
+    uint8_t tmp[UDP_MAX_PKT_SIZE+4];
+    int remaining_in_dg;
+} UDPContext;
 
 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
                                  struct sockaddr *addr)
@@ -300,6 +321,73 @@ static int udp_get_file_handle(URLContext *h)
     return s->udp_fd;
 }
 
+#if HAVE_PTHREADS
+static void *circular_buffer_task( void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    UDPContext *s = h->priv_data;
+    fd_set rfds;
+    struct timeval tv;
+
+    while(!s->exit_thread) {
+        int left;
+        int ret;
+        int len;
+
+        if (ff_check_interrupt(&h->interrupt_callback)) {
+            s->circular_buffer_error = EINTR;
+            goto end;
+        }
+
+        FD_ZERO(&rfds);
+        FD_SET(s->udp_fd, &rfds);
+        tv.tv_sec = 1;
+        tv.tv_usec = 0;
+        ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
+        if (ret < 0) {
+            if (ff_neterrno() == AVERROR(EINTR))
+                continue;
+            s->circular_buffer_error = EIO;
+            goto end;
+        }
+
+        if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
+            continue;
+
+        /* How much do we have left to the end of the buffer */
+        /* Whats the minimum we can read so that we dont comletely fill the buffer */
+        left = av_fifo_space(s->fifo);
+
+        /* No Space left, error, what do we do now */
+        if(left < UDP_MAX_PKT_SIZE + 4) {
+            av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
+            s->circular_buffer_error = EIO;
+            goto end;
+        }
+        left = FFMIN(left, s->fifo->end - s->fifo->wptr);
+        len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
+        if (len < 0) {
+            if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
+                s->circular_buffer_error = EIO;
+                goto end;
+            }
+            continue;
+        }
+        AV_WL32(s->tmp, len);
+        pthread_mutex_lock(&s->mutex);
+        av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+    }
+
+end:
+    pthread_mutex_lock(&s->mutex);
+    pthread_cond_signal(&s->cond);
+    pthread_mutex_unlock(&s->mutex);
+    return NULL;
+}
+#endif
+
 /* put it in UDP context */
 /* return non zero if error */
 static int udp_open(URLContext *h, const char *uri, int flags)
@@ -322,6 +410,8 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     s->ttl = 16;
     s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
 
+    s->circular_buffer_size = 7*188*4096;
+
     p = strchr(uri, '?');
     if (p) {
         if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
@@ -347,6 +437,9 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
             s->is_connected = strtol(buf, NULL, 10);
         }
+        if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
+            s->circular_buffer_size = strtol(buf, NULL, 10)*188;
+        }
         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
             av_strlcpy(localaddr, buf, sizeof(localaddr));
         }
@@ -431,10 +524,43 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     }
 
     s->udp_fd = udp_fd;
+
+#if HAVE_PTHREADS
+    if (!is_output && s->circular_buffer_size) {
+        int ret;
+
+        /* start the task going */
+        s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        ret = pthread_mutex_init(&s->mutex, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+            goto fail;
+        }
+        ret = pthread_cond_init(&s->cond, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+            goto cond_fail;
+        }
+        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+            goto thread_fail;
+        }
+        s->thread_started = 1;
+    }
+#endif
+
     return 0;
+#if HAVE_PTHREADS
+ thread_fail:
+    pthread_cond_destroy(&s->cond);
+ cond_fail:
+    pthread_mutex_destroy(&s->mutex);
+#endif
  fail:
     if (udp_fd >= 0)
         closesocket(udp_fd);
+    av_fifo_free(s->fifo);
     return AVERROR(EIO);
 }
 
@@ -442,6 +568,40 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
 {
     UDPContext *s = h->priv_data;
     int ret;
+    int avail;
+
+#if HAVE_PTHREADS
+    if (s->fifo) {
+        pthread_mutex_lock(&s->mutex);
+        do {
+            avail = av_fifo_size(s->fifo);
+            if (avail) { // >=size) {
+                uint8_t tmp[4];
+                pthread_mutex_unlock(&s->mutex);
+
+                av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+                avail= AV_RL32(tmp);
+                if(avail > size){
+                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
+                    avail= size;
+                }
+
+                av_fifo_generic_read(s->fifo, buf, avail, NULL);
+                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
+                return avail;
+            } else if(s->circular_buffer_error){
+                pthread_mutex_unlock(&s->mutex);
+                return s->circular_buffer_error;
+            } else if(h->flags & AVIO_FLAG_NONBLOCK) {
+                pthread_mutex_unlock(&s->mutex);
+                return AVERROR(EAGAIN);
+            }
+            else {
+                pthread_cond_wait(&s->cond, &s->mutex);
+            }
+        } while( 1);
+    }
+#endif
 
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 0);
@@ -449,6 +609,7 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
             return ret;
     }
     ret = recv(s->udp_fd, buf, size, 0);
+
     return ret < 0 ? ff_neterrno() : ret;
 }
 
@@ -476,10 +637,23 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
 static int udp_close(URLContext *h)
 {
     UDPContext *s = h->priv_data;
+    int ret;
 
     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
     closesocket(s->udp_fd);
+    av_fifo_free(s->fifo);
+#if HAVE_PTHREADS
+    if (s->thread_started) {
+        s->exit_thread = 1;
+        ret = pthread_join(s->circular_buffer_thread, NULL);
+        if (ret != 0)
+            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
+    }
+
+    pthread_mutex_destroy(&s->mutex);
+    pthread_cond_destroy(&s->cond);
+#endif
     return 0;
 }