]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/udp.c
avformat/nutenc: Write size into right dynamic buffer
[ffmpeg] / libavformat / udp.c
index f5f636e1af88bf7b8bc85ae8751576df900cf204..ad6992c57dbc503d59fb6330fa944963f1d61885 100644 (file)
 #define IPPROTO_UDPLITE                                  136
 #endif
 
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
 #if HAVE_PTHREAD_CANCEL
-#include <pthread.h>
+#include "libavutil/thread.h"
 #endif
 
 #ifndef IPV6_ADD_MEMBERSHIP
@@ -71,6 +76,7 @@
 #endif
 
 #define UDP_TX_BUF_SIZE 32768
+#define UDP_RX_BUF_SIZE 393216
 #define UDP_MAX_PKT_SIZE 65536
 #define UDP_HEADER_SIZE 8
 
@@ -519,14 +525,12 @@ static void *circular_buffer_task_tx( void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
-    int old_cancelstate;
     int64_t target_timestamp = av_gettime_relative();
     int64_t start_timestamp = av_gettime_relative();
     int64_t sent_bits = 0;
     int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
     int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
 
-    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
     pthread_mutex_lock(&s->mutex);
 
     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
@@ -561,7 +565,6 @@ static void *circular_buffer_task_tx( void *_URLContext)
         av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
 
         pthread_mutex_unlock(&s->mutex);
-        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
 
         if (s->bitrate) {
             timestamp = av_gettime_relative();
@@ -607,7 +610,6 @@ static void *circular_buffer_task_tx( void *_URLContext)
             }
         }
 
-        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
         pthread_mutex_lock(&s->mutex);
     }
 
@@ -636,7 +638,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
     is_output = !(flags & AVIO_FLAG_READ);
     if (s->buffer_size < 0)
-        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
+        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE;
 
     if (s->sources) {
         if (ff_ip_parse_sources(h, s->sources, &s->filters) < 0)
@@ -978,9 +980,10 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
                 int64_t t = av_gettime() + 100000;
                 struct timespec tv = { .tv_sec  =  t / 1000000,
                                        .tv_nsec = (t % 1000000) * 1000 };
-                if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
+                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
+                if (err) {
                     pthread_mutex_unlock(&s->mutex);
-                    return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
+                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
                 }
                 nonblock = 1;
             }
@@ -1071,8 +1074,17 @@ static int udp_close(URLContext *h)
     if (s->thread_started) {
         int ret;
         // Cancel only read, as write has been signaled as success to the user
-        if (h->flags & AVIO_FLAG_READ)
+        if (h->flags & AVIO_FLAG_READ) {
+#ifdef _WIN32
+            /* recvfrom() is not a cancellation point for win32, so we shutdown
+             * the socket and abort pending IO, subsequent recvfrom() calls
+             * will fail with WSAESHUTDOWN causing the thread to exit. */
+            shutdown(s->udp_fd, SD_RECEIVE);
+            CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL);
+#else
             pthread_cancel(s->circular_buffer_thread);
+#endif
+        }
         ret = pthread_join(s->circular_buffer_thread, NULL);
         if (ret != 0)
             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));