X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=libavformat%2Fnetwork.c;h=0f5a575f773bc9c9016b8885b7ae971079a91993;hb=a04ad248a05e7b613abe09b3bb067f555108d794;hp=4bae7e2e446367a5009f7ba5c98aae8233a7e624;hpb=1a8be90adbaf86faa3053ff98118004ad7711c8c;p=ffmpeg diff --git a/libavformat/network.c b/libavformat/network.c index 4bae7e2e446..0f5a575f773 100644 --- a/libavformat/network.c +++ b/libavformat/network.c @@ -24,6 +24,7 @@ #include "url.h" #include "libavcodec/internal.h" #include "libavutil/avutil.h" +#include "libavutil/avassert.h" #include "libavutil/mem.h" #include "libavutil/time.h" @@ -165,14 +166,17 @@ static int ff_poll_interrupt(struct pollfd *p, nfds_t nfds, int timeout, if (ff_check_interrupt(cb)) return AVERROR_EXIT; ret = poll(p, nfds, POLLING_TIME); - if (ret != 0) + if (ret != 0) { + if (ret < 0) + ret = ff_neterrno(); + if (ret == AVERROR(EINTR)) + continue; break; + } } while (timeout <= 0 || runs-- > 0); if (!ret) return AVERROR(ETIMEDOUT); - if (ret < 0) - return ff_neterrno(); return ret; } @@ -234,7 +238,7 @@ int ff_accept(int fd, int timeout, URLContext *h) if (ret < 0) return ff_neterrno(); if (ff_socket_nonblock(ret, 1) < 0) - av_log(NULL, AV_LOG_DEBUG, "ff_socket_nonblock failed\n"); + av_log(h, AV_LOG_DEBUG, "ff_socket_nonblock failed\n"); return ret; } @@ -260,7 +264,7 @@ int ff_listen_connect(int fd, const struct sockaddr *addr, socklen_t optlen; if (ff_socket_nonblock(fd, 1) < 0) - av_log(NULL, AV_LOG_DEBUG, "ff_socket_nonblock failed\n"); + av_log(h, AV_LOG_DEBUG, "ff_socket_nonblock failed\n"); while ((ret = connect(fd, addr, addrlen))) { ret = ff_neterrno(); @@ -296,6 +300,230 @@ int ff_listen_connect(int fd, const struct sockaddr *addr, return ret; } +static void interleave_addrinfo(struct addrinfo *base) +{ + struct addrinfo **next = &base->ai_next; + while (*next) { + struct addrinfo *cur = *next; + // Iterate forward until we find an entry of a different family. + if (cur->ai_family == base->ai_family) { + next = &cur->ai_next; + continue; + } + if (cur == base->ai_next) { + // If the first one following base is of a different family, just + // move base forward one step and continue. + base = cur; + next = &base->ai_next; + continue; + } + // Unchain cur from the rest of the list from its current spot. + *next = cur->ai_next; + // Hook in cur directly after base. + cur->ai_next = base->ai_next; + base->ai_next = cur; + // Restart with a new base. We know that before moving the cur element, + // everything between the previous base and cur had the same family, + // different from cur->ai_family. Therefore, we can keep next pointing + // where it was, and continue from there with base at the one after + // cur. + base = cur->ai_next; + } +} + +static void print_address_list(void *ctx, const struct addrinfo *addr, + const char *title) +{ + char hostbuf[100], portbuf[20]; + av_log(ctx, AV_LOG_DEBUG, "%s:\n", title); + while (addr) { + getnameinfo(addr->ai_addr, addr->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_log(ctx, AV_LOG_DEBUG, "Address %s port %s\n", hostbuf, portbuf); + addr = addr->ai_next; + } +} + +struct ConnectionAttempt { + int fd; + int64_t deadline_us; + struct addrinfo *addr; +}; + +// Returns < 0 on error, 0 on successfully started connection attempt, +// > 0 for a connection that succeeded already. +static int start_connect_attempt(struct ConnectionAttempt *attempt, + struct addrinfo **ptr, int timeout_ms, + URLContext *h, + void (*customize_fd)(void *, int), void *customize_ctx) +{ + struct addrinfo *ai = *ptr; + int ret; + + *ptr = ai->ai_next; + + attempt->fd = ff_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (attempt->fd < 0) + return ff_neterrno(); + attempt->deadline_us = av_gettime_relative() + timeout_ms * 1000; + attempt->addr = ai; + + ff_socket_nonblock(attempt->fd, 1); + + if (customize_fd) + customize_fd(customize_ctx, attempt->fd); + + while ((ret = connect(attempt->fd, ai->ai_addr, ai->ai_addrlen))) { + ret = ff_neterrno(); + switch (ret) { + case AVERROR(EINTR): + if (ff_check_interrupt(&h->interrupt_callback)) { + closesocket(attempt->fd); + attempt->fd = -1; + return AVERROR_EXIT; + } + continue; + case AVERROR(EINPROGRESS): + case AVERROR(EAGAIN): + return 0; + default: + closesocket(attempt->fd); + attempt->fd = -1; + return ret; + } + } + return 1; +} + +// Try a new connection to another address after 200 ms, as suggested in +// RFC 8305 (or sooner if an earlier attempt fails). +#define NEXT_ATTEMPT_DELAY_MS 200 + +int ff_connect_parallel(struct addrinfo *addrs, int timeout_ms_per_address, + int parallel, URLContext *h, int *fd, + void (*customize_fd)(void *, int), void *customize_ctx) +{ + struct ConnectionAttempt attempts[3]; + struct pollfd pfd[3]; + int nb_attempts = 0, i, j; + int64_t next_attempt_us = av_gettime_relative(), next_deadline_us; + int last_err = AVERROR(EIO); + socklen_t optlen; + char errbuf[100], hostbuf[100], portbuf[20]; + + if (parallel > FF_ARRAY_ELEMS(attempts)) + parallel = FF_ARRAY_ELEMS(attempts); + + print_address_list(h, addrs, "Original list of addresses"); + // This mutates the list, but the head of the list is still the same + // element, so the caller, who owns the list, doesn't need to get + // an updated pointer. + interleave_addrinfo(addrs); + print_address_list(h, addrs, "Interleaved list of addresses"); + + while (nb_attempts > 0 || addrs) { + // Start a new connection attempt, if possible. + if (nb_attempts < parallel && addrs) { + getnameinfo(addrs->ai_addr, addrs->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_log(h, AV_LOG_VERBOSE, "Starting connection attempt to %s port %s\n", + hostbuf, portbuf); + last_err = start_connect_attempt(&attempts[nb_attempts], &addrs, + timeout_ms_per_address, h, + customize_fd, customize_ctx); + if (last_err < 0) { + av_strerror(last_err, errbuf, sizeof(errbuf)); + av_log(h, AV_LOG_VERBOSE, "Connected attempt failed: %s\n", + errbuf); + continue; + } + if (last_err > 0) { + for (i = 0; i < nb_attempts; i++) + closesocket(attempts[i].fd); + *fd = attempts[nb_attempts].fd; + return 0; + } + pfd[nb_attempts].fd = attempts[nb_attempts].fd; + pfd[nb_attempts].events = POLLOUT; + next_attempt_us = av_gettime_relative() + NEXT_ATTEMPT_DELAY_MS * 1000; + nb_attempts++; + } + + av_assert0(nb_attempts > 0); + // The connection attempts are sorted from oldest to newest, so the + // first one will have the earliest deadline. + next_deadline_us = attempts[0].deadline_us; + // If we can start another attempt in parallel, wait until that time. + if (nb_attempts < parallel && addrs) + next_deadline_us = FFMIN(next_deadline_us, next_attempt_us); + last_err = ff_poll_interrupt(pfd, nb_attempts, + (next_deadline_us - av_gettime_relative())/1000, + &h->interrupt_callback); + if (last_err < 0 && last_err != AVERROR(ETIMEDOUT)) + break; + + // Check the status from the poll output. + for (i = 0; i < nb_attempts; i++) { + last_err = 0; + if (pfd[i].revents) { + // Some sort of action for this socket, check its status (either + // a successful connection or an error). + optlen = sizeof(last_err); + if (getsockopt(attempts[i].fd, SOL_SOCKET, SO_ERROR, &last_err, &optlen)) + last_err = ff_neterrno(); + else if (last_err != 0) + last_err = AVERROR(last_err); + if (last_err == 0) { + // Everything is ok, we seem to have a successful + // connection. Close other sockets and return this one. + for (j = 0; j < nb_attempts; j++) + if (j != i) + closesocket(attempts[j].fd); + *fd = attempts[i].fd; + getnameinfo(attempts[i].addr->ai_addr, attempts[i].addr->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_log(h, AV_LOG_VERBOSE, "Successfully connected to %s port %s\n", + hostbuf, portbuf); + return 0; + } + } + if (attempts[i].deadline_us < av_gettime_relative() && !last_err) + last_err = AVERROR(ETIMEDOUT); + if (!last_err) + continue; + // Error (or timeout) for this socket; close the socket and remove + // it from the attempts/pfd arrays, to let a new attempt start + // directly. + getnameinfo(attempts[i].addr->ai_addr, attempts[i].addr->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_strerror(last_err, errbuf, sizeof(errbuf)); + av_log(h, AV_LOG_VERBOSE, "Connection attempt to %s port %s " + "failed: %s\n", hostbuf, portbuf, errbuf); + closesocket(attempts[i].fd); + memmove(&attempts[i], &attempts[i + 1], + (nb_attempts - i - 1) * sizeof(*attempts)); + memmove(&pfd[i], &pfd[i + 1], + (nb_attempts - i - 1) * sizeof(*pfd)); + i--; + nb_attempts--; + } + } + for (i = 0; i < nb_attempts; i++) + closesocket(attempts[i].fd); + if (last_err >= 0) + last_err = AVERROR(ECONNREFUSED); + if (last_err != AVERROR_EXIT) { + av_strerror(last_err, errbuf, sizeof(errbuf)); + av_log(h, AV_LOG_ERROR, "Connection to %s failed: %s\n", + h->filename, errbuf); + } + return last_err; +} + static int match_host_pattern(const char *pattern, const char *hostname) { int len_p, len_h;