]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/udp.c
avcodec/avcodec: Use avcodec_close() on avcodec_open2() failure
[ffmpeg] / libavformat / udp.c
index cf73d331e0472f20ecbab9c6c873a3a0ac98bef4..9b9d3de197cf2bf8f3490668a5d5836d99a2f231 100644 (file)
 #define IPPROTO_UDPLITE                                  136
 #endif
 
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
 #if HAVE_PTHREAD_CANCEL
-#include <pthread.h>
+#include "libavutil/thread.h"
 #endif
 
 #ifndef IPV6_ADD_MEMBERSHIP
@@ -71,6 +76,7 @@
 #endif
 
 #define UDP_TX_BUF_SIZE 32768
+#define UDP_RX_BUF_SIZE 393216
 #define UDP_MAX_PKT_SIZE 65536
 #define UDP_HEADER_SIZE 8
 
@@ -132,7 +138,7 @@ static const AVOption options[] = {
     { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_BOOL,   { .i64 =  0 },     0, 1,       .flags = D|E },
     { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
     { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
-    { "timeout",        "set raise error timeout (only in read mode)",     OFFSET(timeout),        AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, D },
+    { "timeout",        "set raise error timeout, in microseconds (only in read mode)",OFFSET(timeout),         AV_OPT_TYPE_INT,  {.i64 = 0}, 0, INT_MAX, D },
     { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
     { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
     { NULL }
@@ -159,7 +165,7 @@ static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
     if (addr->sa_family == AF_INET) {
         if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
-            return -1;
+            return ff_neterrno();
         }
     }
 #endif
@@ -167,7 +173,7 @@ static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
     if (addr->sa_family == AF_INET6) {
         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
-            return -1;
+            return ff_neterrno();
         }
     }
 #endif
@@ -184,10 +190,10 @@ static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct soc
         if (local_addr)
             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
         else
-            mreq.imr_interface.s_addr= INADDR_ANY;
+            mreq.imr_interface.s_addr = INADDR_ANY;
         if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
-            return -1;
+            return ff_neterrno();
         }
     }
 #endif
@@ -197,10 +203,10 @@ static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct soc
 
         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
         //TODO: Interface index should be looked up from local_addr
-        mreq6.ipv6mr_interface= 0;
+        mreq6.ipv6mr_interface = 0;
         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
-            return -1;
+            return ff_neterrno();
         }
     }
 #endif
@@ -215,9 +221,9 @@ static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct so
 
         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
         if (local_addr)
-            mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
+            mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
         else
-            mreq.imr_interface.s_addr= INADDR_ANY;
+            mreq.imr_interface.s_addr = INADDR_ANY;
         if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
             return -1;
@@ -230,7 +236,7 @@ static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct so
 
         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
         //TODO: Interface index should be looked up from local_addr
-        mreq6.ipv6mr_interface= 0;
+        mreq6.ipv6mr_interface = 0;
         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
             ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
             return -1;
@@ -274,7 +280,7 @@ static int udp_set_multicast_sources(URLContext *h,
         }
         return 0;
 #else
-        av_log(NULL, AV_LOG_ERROR,
+        av_log(h, AV_LOG_ERROR,
                "Setting multicast sources only supported for IPv4\n");
         return AVERROR(EINVAL);
 #endif
@@ -283,24 +289,24 @@ static int udp_set_multicast_sources(URLContext *h,
     for (i = 0; i < nb_sources; i++) {
         struct ip_mreq_source mreqs;
         if (sources[i].ss_family != AF_INET) {
-            av_log(NULL, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
+            av_log(h, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
             return AVERROR(EINVAL);
         }
 
         mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
         if (local_addr)
-            mreqs.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
+            mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
         else
-            mreqs.imr_interface.s_addr= INADDR_ANY;
+            mreqs.imr_interface.s_addr = INADDR_ANY;
         mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr;
 
         if (setsockopt(sockfd, IPPROTO_IP,
                        include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
             if (include)
-                ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
+                ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
             else
-                ff_log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
+                ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
             return ff_neterrno();
         }
     }
@@ -519,14 +525,12 @@ static void *circular_buffer_task_tx( void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
-    int old_cancelstate;
     int64_t target_timestamp = av_gettime_relative();
     int64_t start_timestamp = av_gettime_relative();
     int64_t sent_bits = 0;
     int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
     int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
 
-    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
     pthread_mutex_lock(&s->mutex);
 
     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
@@ -541,7 +545,7 @@ static void *circular_buffer_task_tx( void *_URLContext)
         uint8_t tmp[4];
         int64_t timestamp;
 
-        len=av_fifo_size(s->fifo);
+        len = av_fifo_size(s->fifo);
 
         while (len<4) {
             if (s->close_req)
@@ -549,11 +553,11 @@ static void *circular_buffer_task_tx( void *_URLContext)
             if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
                 goto end;
             }
-            len=av_fifo_size(s->fifo);
+            len = av_fifo_size(s->fifo);
         }
 
         av_fifo_generic_read(s->fifo, tmp, 4, NULL);
-        len=AV_RL32(tmp);
+        len = AV_RL32(tmp);
 
         av_assert0(len >= 0);
         av_assert0(len <= sizeof(s->tmp));
@@ -561,7 +565,6 @@ static void *circular_buffer_task_tx( void *_URLContext)
         av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
 
         pthread_mutex_unlock(&s->mutex);
-        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
 
         if (s->bitrate) {
             timestamp = av_gettime_relative();
@@ -607,7 +610,6 @@ static void *circular_buffer_task_tx( void *_URLContext)
             }
         }
 
-        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
         pthread_mutex_lock(&s->mutex);
     }
 
@@ -631,26 +633,24 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     char buf[256];
     struct sockaddr_storage my_addr;
     socklen_t len;
+    int ret;
 
     h->is_streamed = 1;
 
     is_output = !(flags & AVIO_FLAG_READ);
     if (s->buffer_size < 0)
-        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
+        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE;
 
     if (s->sources) {
-        if (ff_ip_parse_sources(h, s->sources, &s->filters) < 0)
+        if ((ret = ff_ip_parse_sources(h, s->sources, &s->filters)) < 0)
             goto fail;
     }
 
     if (s->block) {
-        if (ff_ip_parse_blocks(h, s->block, &s->filters) < 0)
+        if ((ret = ff_ip_parse_blocks(h, s->block, &s->filters)) < 0)
             goto fail;
     }
 
-    if (s->pkt_size > 0)
-        h->max_packet_size = s->pkt_size;
-
     p = strchr(uri, '?');
     if (p) {
         if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
@@ -713,11 +713,11 @@ static int udp_open(URLContext *h, const char *uri, int flags)
             av_strlcpy(localaddr, buf, sizeof(localaddr));
         }
         if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
-            if (ff_ip_parse_sources(h, buf, &s->filters) < 0)
+            if ((ret = ff_ip_parse_sources(h, buf, &s->filters)) < 0)
                 goto fail;
         }
         if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
-            if (ff_ip_parse_blocks(h, buf, &s->filters) < 0)
+            if ((ret = ff_ip_parse_blocks(h, buf, &s->filters)) < 0)
                 goto fail;
         }
         if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
@@ -743,7 +743,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         if (!(flags & AVIO_FLAG_READ))
             goto fail;
     } else {
-        if (ff_udp_set_remote_url(h, uri) < 0)
+        if ((ret = ff_udp_set_remote_url(h, uri)) < 0)
             goto fail;
     }
 
@@ -764,15 +764,22 @@ static int udp_open(URLContext *h, const char *uri, int flags)
      */
     if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
         s->reuse_socket = 1;
-        if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
+        if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) {
+            ret = ff_neterrno();
             goto fail;
+        }
     }
 
     if (s->is_broadcast) {
 #ifdef SO_BROADCAST
-        if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0)
+        if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) {
+            ret = ff_neterrno();
+            goto fail;
+        }
+#else
+        ret = AVERROR(ENOSYS);
+        goto fail;
 #endif
-           goto fail;
     }
 
     /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
@@ -789,15 +796,17 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
     if (dscp >= 0) {
         dscp <<= 2;
-        if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0)
+        if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) {
+            ret = ff_neterrno();
             goto fail;
+        }
     }
 
     /* If multicast, try binding the multicast address first, to avoid
      * receiving UDP packets from other sources aimed at the same UDP
      * port. This fails on windows. This makes sending to the same address
      * using sendto() fail, so only do it if we're opened in read-only mode. */
-    if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
+    if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) {
         bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
     }
     /* bind to the local address if not multicast or if the multicast
@@ -805,6 +814,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     /* the bind is needed to give a port to the socket now */
     if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
         ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
+        ret = ff_neterrno();
         goto fail;
     }
 
@@ -815,28 +825,28 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     if (s->is_multicast) {
         if (h->flags & AVIO_FLAG_WRITE) {
             /* output */
-            if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
+            if ((ret = udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr)) < 0)
                 goto fail;
         }
         if (h->flags & AVIO_FLAG_READ) {
             /* input */
             if (s->filters.nb_include_addrs) {
-                if (udp_set_multicast_sources(h, udp_fd,
+                if ((ret = udp_set_multicast_sources(h, udp_fd,
                                               (struct sockaddr *)&s->dest_addr,
                                               s->dest_addr_len, &s->local_addr_storage,
                                               s->filters.include_addrs,
-                                              s->filters.nb_include_addrs, 1) < 0)
+                                              s->filters.nb_include_addrs, 1)) < 0)
                     goto fail;
             } else {
-                if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0)
+                if ((ret = udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage)) < 0)
                     goto fail;
             }
             if (s->filters.nb_exclude_addrs) {
-                if (udp_set_multicast_sources(h, udp_fd,
+                if ((ret = udp_set_multicast_sources(h, udp_fd,
                                               (struct sockaddr *)&s->dest_addr,
                                               s->dest_addr_len, &s->local_addr_storage,
                                               s->filters.exclude_addrs,
-                                              s->filters.nb_exclude_addrs, 0) < 0)
+                                              s->filters.nb_exclude_addrs, 0)) < 0)
                     goto fail;
             }
         }
@@ -847,10 +857,11 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         tmp = s->buffer_size;
         if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
             ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
+            ret = ff_neterrno();
             goto fail;
         }
     } else {
-        /* set udp recv buffer size to the requested value (default 64K) */
+        /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */
         tmp = s->buffer_size;
         if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
             ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
@@ -861,7 +872,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         } else {
             av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
             if(tmp < s->buffer_size)
-                av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
+                av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s->buffer_size, tmp);
         }
 
         /* make the socket non-blocking */
@@ -870,6 +881,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     if (s->is_connected) {
         if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
             ff_log_net_error(h, AV_LOG_ERROR, "connect");
+            ret = ff_neterrno();
             goto fail;
         }
     }
@@ -889,23 +901,28 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     }
 
     if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
-        int ret;
-
         /* start the task going */
         s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        if (!s->fifo) {
+            ret = AVERROR(ENOMEM);
+            goto fail;
+        }
         ret = pthread_mutex_init(&s->mutex, NULL);
         if (ret != 0) {
             av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
             goto fail;
         }
         ret = pthread_cond_init(&s->cond, NULL);
         if (ret != 0) {
             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
             goto cond_fail;
         }
         ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
         if (ret != 0) {
             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
             goto thread_fail;
         }
         s->thread_started = 1;
@@ -924,7 +941,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         closesocket(udp_fd);
     av_fifo_freep(&s->fifo);
     ff_ip_reset_filters(&s->filters);
-    return AVERROR(EIO);
+    return ret;
 }
 
 static int udplite_open(URLContext *h, const char *uri, int flags)
@@ -954,10 +971,10 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
                 uint8_t tmp[4];
 
                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
-                avail= AV_RL32(tmp);
+                avail = AV_RL32(tmp);
                 if(avail > size){
                     av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
-                    avail= size;
+                    avail = size;
                 }
 
                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
@@ -971,20 +988,20 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
             } else if(nonblock) {
                 pthread_mutex_unlock(&s->mutex);
                 return AVERROR(EAGAIN);
-            }
-            else {
+            } else {
                 /* FIXME: using the monotonic clock would be better,
                    but it does not exist on all supported platforms. */
                 int64_t t = av_gettime() + 100000;
                 struct timespec tv = { .tv_sec  =  t / 1000000,
                                        .tv_nsec = (t % 1000000) * 1000 };
-                if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
+                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
+                if (err) {
                     pthread_mutex_unlock(&s->mutex);
-                    return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
+                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
                 }
                 nonblock = 1;
             }
-        } while( 1);
+        } while(1);
     }
 #endif
 
@@ -1017,7 +1034,7 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
           Here we can't know on which packet error was, but it needs to know that error exists.
         */
         if (s->circular_buffer_error<0) {
-            int err=s->circular_buffer_error;
+            int err = s->circular_buffer_error;
             pthread_mutex_unlock(&s->mutex);
             return err;
         }
@@ -1071,8 +1088,17 @@ static int udp_close(URLContext *h)
     if (s->thread_started) {
         int ret;
         // Cancel only read, as write has been signaled as success to the user
-        if (h->flags & AVIO_FLAG_READ)
+        if (h->flags & AVIO_FLAG_READ) {
+#ifdef _WIN32
+            /* recvfrom() is not a cancellation point for win32, so we shutdown
+             * the socket and abort pending IO, subsequent recvfrom() calls
+             * will fail with WSAESHUTDOWN causing the thread to exit. */
+            shutdown(s->udp_fd, SD_RECEIVE);
+            CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL);
+#else
             pthread_cancel(s->circular_buffer_thread);
+#endif
+        }
         ret = pthread_join(s->circular_buffer_thread, NULL);
         if (ret != 0)
             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));