]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/udp.c
Merge commit '33ac77e850efdfd0e8835950c3d947baffd4df45'
[ffmpeg] / libavformat / udp.c
index e42b911c42d0b88cf14a859a65cfa4e9e4176847..8699c1c119c2e58c75b0aec01eaf73138e1b3a88 100644 (file)
@@ -29,6 +29,7 @@
 
 #include "avformat.h"
 #include "avio_internal.h"
+#include "libavutil/avassert.h"
 #include "libavutil/parseutils.h"
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
@@ -92,6 +93,9 @@ typedef struct UDPContext {
     int circular_buffer_size;
     AVFifoBuffer *fifo;
     int circular_buffer_error;
+    int64_t bitrate; /* number of bits to send per second */
+    int64_t burst_bits;
+    int close_req;
 #if HAVE_PTHREAD_CANCEL
     pthread_t circular_buffer_thread;
     pthread_mutex_t mutex;
@@ -112,6 +116,8 @@ typedef struct UDPContext {
 #define E AV_OPT_FLAG_ENCODING_PARAM
 static const AVOption options[] = {
     { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
+    { "bitrate",        "Bits to send per second",                         OFFSET(bitrate),        AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
+    { "burst_bits",     "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits),   AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
     { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
     { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
     { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
@@ -260,7 +266,7 @@ static struct addrinfo *udp_resolve_host(URLContext *h,
         res = NULL;
         av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n",
                node ? node : "unknown",
-               service ? service : "unknown",
+               service,
                gai_strerror(error));
     }
 
@@ -486,7 +492,7 @@ static int udp_get_file_handle(URLContext *h)
 }
 
 #if HAVE_PTHREAD_CANCEL
-static void *circular_buffer_task( void *_URLContext)
+static void *circular_buffer_task_rx( void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
@@ -542,6 +548,109 @@ end:
     pthread_mutex_unlock(&s->mutex);
     return NULL;
 }
+
+static void *circular_buffer_task_tx( void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    UDPContext *s = h->priv_data;
+    int old_cancelstate;
+    int64_t target_timestamp = av_gettime_relative();
+    int64_t start_timestamp = av_gettime_relative();
+    int64_t sent_bits = 0;
+    int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
+    int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
+
+    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+    pthread_mutex_lock(&s->mutex);
+
+    if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
+        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+        s->circular_buffer_error = AVERROR(EIO);
+        goto end;
+    }
+
+    for(;;) {
+        int len;
+        const uint8_t *p;
+        uint8_t tmp[4];
+        int64_t timestamp;
+
+        len=av_fifo_size(s->fifo);
+
+        while (len<4) {
+            if (s->close_req)
+                goto end;
+            if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
+                goto end;
+            }
+            len=av_fifo_size(s->fifo);
+        }
+
+        av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+        len=AV_RL32(tmp);
+
+        av_assert0(len >= 0);
+        av_assert0(len <= sizeof(s->tmp));
+
+        av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+        pthread_mutex_unlock(&s->mutex);
+        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+        if (s->bitrate) {
+            timestamp = av_gettime_relative();
+            if (timestamp < target_timestamp) {
+                int64_t delay = target_timestamp - timestamp;
+                if (delay > max_delay) {
+                    delay = max_delay;
+                    start_timestamp = timestamp + delay;
+                    sent_bits = 0;
+                }
+                av_usleep(delay);
+            } else {
+                if (timestamp - burst_interval > target_timestamp) {
+                    start_timestamp = timestamp - burst_interval;
+                    sent_bits = 0;
+                }
+            }
+            sent_bits += len * 8;
+            target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
+        }
+
+        p = s->tmp;
+        while (len) {
+            int ret;
+            av_assert0(len > 0);
+            if (!s->is_connected) {
+                ret = sendto (s->udp_fd, p, len, 0,
+                            (struct sockaddr *) &s->dest_addr,
+                            s->dest_addr_len);
+            } else
+                ret = send(s->udp_fd, p, len, 0);
+            if (ret >= 0) {
+                len -= ret;
+                p   += ret;
+            } else {
+                ret = ff_neterrno();
+                if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+                    pthread_mutex_lock(&s->mutex);
+                    s->circular_buffer_error = ret;
+                    pthread_mutex_unlock(&s->mutex);
+                    return NULL;
+                }
+            }
+        }
+
+        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+        pthread_mutex_lock(&s->mutex);
+    }
+
+end:
+    pthread_mutex_unlock(&s->mutex);
+    return NULL;
+}
+
+
 #endif
 
 static int parse_source_list(char *buf, char **sources, int *num_sources,
@@ -650,6 +759,16 @@ static int udp_open(URLContext *h, const char *uri, int flags)
                        "'circular_buffer_size' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
         }
+        if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
+            s->bitrate = strtoll(buf, NULL, 10);
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'bitrate' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
+            s->burst_bits = strtoll(buf, NULL, 10);
+        }
         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
             av_strlcpy(localaddr, buf, sizeof(localaddr));
         }
@@ -829,7 +948,18 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     s->udp_fd = udp_fd;
 
 #if HAVE_PTHREAD_CANCEL
-    if (!is_output && s->circular_buffer_size) {
+    /*
+      Create thread in case of:
+      1. Input and circular_buffer_size is set
+      2. Output and bitrate and circular_buffer_size is set
+    */
+
+    if (is_output && s->bitrate && !s->circular_buffer_size) {
+        /* Warn user in case of 'circular_buffer_size' is not set */
+        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
+    }
+
+    if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
         int ret;
 
         /* start the task going */
@@ -844,7 +974,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
             goto cond_fail;
         }
-        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
+        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
         if (ret != 0) {
             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
             goto thread_fail;
@@ -945,6 +1075,35 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
 
+#if HAVE_PTHREAD_CANCEL
+    if (s->fifo) {
+        uint8_t tmp[4];
+
+        pthread_mutex_lock(&s->mutex);
+
+        /*
+          Return error if last tx failed.
+          Here we can't know on which packet error was, but it needs to know that error exists.
+        */
+        if (s->circular_buffer_error<0) {
+            int err=s->circular_buffer_error;
+            pthread_mutex_unlock(&s->mutex);
+            return err;
+        }
+
+        if(av_fifo_space(s->fifo) < size + 4) {
+            /* What about a partial packet tx ? */
+            pthread_mutex_unlock(&s->mutex);
+            return AVERROR(ENOMEM);
+        }
+        AV_WL32(tmp, size);
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+        return size;
+    }
+#endif
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 1);
         if (ret < 0)
@@ -965,13 +1124,24 @@ static int udp_close(URLContext *h)
 {
     UDPContext *s = h->priv_data;
 
+#if HAVE_PTHREAD_CANCEL
+    // Request close once writing is finished
+    if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
+        pthread_mutex_lock(&s->mutex);
+        s->close_req = 1;
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+    }
+#endif
+
     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
-    closesocket(s->udp_fd);
 #if HAVE_PTHREAD_CANCEL
     if (s->thread_started) {
         int ret;
-        pthread_cancel(s->circular_buffer_thread);
+        // Cancel only read, as write has been signaled as success to the user
+        if (h->flags & AVIO_FLAG_READ)
+            pthread_cancel(s->circular_buffer_thread);
         ret = pthread_join(s->circular_buffer_thread, NULL);
         if (ret != 0)
             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
@@ -979,6 +1149,7 @@ static int udp_close(URLContext *h)
         pthread_cond_destroy(&s->cond);
     }
 #endif
+    closesocket(s->udp_fd);
     av_fifo_freep(&s->fifo);
     return 0;
 }