#include "avio_internal.h"
#include "libavutil/parseutils.h"
#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
#include <unistd.h>
#include "internal.h"
#include "network.h"
#include "os_support.h"
#include "url.h"
+
+#if HAVE_PTHREADS
#include <pthread.h>
+#endif
+
#include <sys/time.h>
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
#endif
+#define UDP_TX_BUF_SIZE 32768
+#define UDP_MAX_PKT_SIZE 65536
+
typedef struct {
int udp_fd;
int ttl;
int circular_buffer_size;
AVFifoBuffer *fifo;
int circular_buffer_error;
+#if HAVE_PTHREADS
pthread_t circular_buffer_thread;
+#endif
+ uint8_t tmp[UDP_MAX_PKT_SIZE+4];
+ int remaining_in_dg;
} UDPContext;
-#define UDP_TX_BUF_SIZE 32768
-#define UDP_MAX_PKT_SIZE 65536
-
static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
struct sockaddr *addr)
{
/* How much do we have left to the end of the buffer */
/* Whats the minimum we can read so that we dont comletely fill the buffer */
left = av_fifo_space(s->fifo);
- left = FFMIN(left, s->fifo->end - s->fifo->wptr);
/* No Space left, error, what do we do now */
- if( !left) {
+ if(left < UDP_MAX_PKT_SIZE + 4) {
av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
s->circular_buffer_error = EIO;
return NULL;
}
-
- len = recv(s->udp_fd, s->fifo->wptr, left, 0);
+ left = FFMIN(left, s->fifo->end - s->fifo->wptr);
+ len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
if (len < 0) {
if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
s->circular_buffer_error = EIO;
return NULL;
}
+ continue;
}
- s->fifo->wptr += len;
- if (s->fifo->wptr >= s->fifo->end)
- s->fifo->wptr = s->fifo->buffer;
- s->fifo->wndx += len;
+ AV_WL32(s->tmp, len);
+ av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
}
return NULL;
p = strchr(uri, '?');
if (p) {
if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
- char *endptr=NULL;
+ char *endptr = NULL;
s->reuse_socket = strtol(buf, &endptr, 10);
/* assume if no digits were found it is a request to enable it */
if (buf == endptr)
if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
s->is_connected = strtol(buf, NULL, 10);
}
- if (av_find_info_tag(buf, sizeof(buf), "buf_size", p)) {
+ if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
s->circular_buffer_size = strtol(buf, NULL, 10)*188;
}
}
goto fail;
}
- if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
+ if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
s->local_port = port;
udp_fd = udp_socket_create(s, &my_addr, &len);
if (udp_fd < 0)
s->udp_fd = udp_fd;
+#if HAVE_PTHREADS
if (!is_output && s->circular_buffer_size) {
/* start the task going */
s->fifo = av_fifo_alloc(s->circular_buffer_size);
goto fail;
}
}
+#endif
return 0;
fail:
if (udp_fd >= 0)
closesocket(udp_fd);
- av_fifo_free(s->fifo);
+ av_fifo_free(s->fifo);
av_free(s);
return AVERROR(EIO);
}
do {
avail = av_fifo_size(s->fifo);
if (avail) { // >=size) {
+ uint8_t tmp[4];
+
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+ avail= AV_RL32(tmp);
+ if(avail > size){
+ av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
+ avail= size;
+ }
- // Maximum amount available
- size = FFMIN( avail, size);
- av_fifo_generic_read(s->fifo, buf, size, NULL);
- return size;
+ av_fifo_generic_read(s->fifo, buf, avail, NULL);
+ return avail;
}
else {
FD_ZERO(&rfds);