#include "avformat.h"
#include "avio_internal.h"
+#include "libavutil/avassert.h"
#include "libavutil/parseutils.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
int circular_buffer_size;
AVFifoBuffer *fifo;
int circular_buffer_error;
+ int64_t bitrate; /* number of bits to send per second */
+ int64_t burst_bits;
+ int close_req;
#if HAVE_PTHREAD_CANCEL
pthread_t circular_buffer_thread;
pthread_mutex_t mutex;
#define E AV_OPT_FLAG_ENCODING_PARAM
static const AVOption options[] = {
{ "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
+ { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
{ "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E },
{ "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
res = NULL;
av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n",
node ? node : "unknown",
- service ? service : "unknown",
+ service,
gai_strerror(error));
}
}
#if HAVE_PTHREAD_CANCEL
-static void *circular_buffer_task( void *_URLContext)
+static void *circular_buffer_task_rx( void *_URLContext)
{
URLContext *h = _URLContext;
UDPContext *s = h->priv_data;
pthread_mutex_unlock(&s->mutex);
return NULL;
}
+
+static void *circular_buffer_task_tx( void *_URLContext)
+{
+ URLContext *h = _URLContext;
+ UDPContext *s = h->priv_data;
+ int old_cancelstate;
+ int64_t target_timestamp = av_gettime_relative();
+ int64_t start_timestamp = av_gettime_relative();
+ int64_t sent_bits = 0;
+ int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
+ int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+
+ if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
+ av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+ s->circular_buffer_error = AVERROR(EIO);
+ goto end;
+ }
+
+ for(;;) {
+ int len;
+ const uint8_t *p;
+ uint8_t tmp[4];
+ int64_t timestamp;
+
+ len=av_fifo_size(s->fifo);
+
+ while (len<4) {
+ if (s->close_req)
+ goto end;
+ if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
+ goto end;
+ }
+ len=av_fifo_size(s->fifo);
+ }
+
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+ len=AV_RL32(tmp);
+
+ av_assert0(len >= 0);
+ av_assert0(len <= sizeof(s->tmp));
+
+ av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+ pthread_mutex_unlock(&s->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+ if (s->bitrate) {
+ timestamp = av_gettime_relative();
+ if (timestamp < target_timestamp) {
+ int64_t delay = target_timestamp - timestamp;
+ if (delay > max_delay) {
+ delay = max_delay;
+ start_timestamp = timestamp + delay;
+ sent_bits = 0;
+ }
+ av_usleep(delay);
+ } else {
+ if (timestamp - burst_interval > target_timestamp) {
+ start_timestamp = timestamp - burst_interval;
+ sent_bits = 0;
+ }
+ }
+ sent_bits += len * 8;
+ target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
+ }
+
+ p = s->tmp;
+ while (len) {
+ int ret;
+ av_assert0(len > 0);
+ if (!s->is_connected) {
+ ret = sendto (s->udp_fd, p, len, 0,
+ (struct sockaddr *) &s->dest_addr,
+ s->dest_addr_len);
+ } else
+ ret = send(s->udp_fd, p, len, 0);
+ if (ret >= 0) {
+ len -= ret;
+ p += ret;
+ } else {
+ ret = ff_neterrno();
+ if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+ pthread_mutex_lock(&s->mutex);
+ s->circular_buffer_error = ret;
+ pthread_mutex_unlock(&s->mutex);
+ return NULL;
+ }
+ }
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+ }
+
+end:
+ pthread_mutex_unlock(&s->mutex);
+ return NULL;
+}
+
+
#endif
static int parse_source_list(char *buf, char **sources, int *num_sources,
"'circular_buffer_size' option was set but it is not supported "
"on this build (pthread support is required)\n");
}
+ if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
+ s->bitrate = strtoll(buf, NULL, 10);
+ if (!HAVE_PTHREAD_CANCEL)
+ av_log(h, AV_LOG_WARNING,
+ "'bitrate' option was set but it is not supported "
+ "on this build (pthread support is required)\n");
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
+ s->burst_bits = strtoll(buf, NULL, 10);
+ }
if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
av_strlcpy(localaddr, buf, sizeof(localaddr));
}
s->udp_fd = udp_fd;
#if HAVE_PTHREAD_CANCEL
- if (!is_output && s->circular_buffer_size) {
+ /*
+ Create thread in case of:
+ 1. Input and circular_buffer_size is set
+ 2. Output and bitrate and circular_buffer_size is set
+ */
+
+ if (is_output && s->bitrate && !s->circular_buffer_size) {
+ /* Warn user in case of 'circular_buffer_size' is not set */
+ av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
+ }
+
+ if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
int ret;
/* start the task going */
av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
goto cond_fail;
}
- ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
+ ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
if (ret != 0) {
av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
goto thread_fail;
UDPContext *s = h->priv_data;
int ret;
+#if HAVE_PTHREAD_CANCEL
+ if (s->fifo) {
+ uint8_t tmp[4];
+
+ pthread_mutex_lock(&s->mutex);
+
+ /*
+ Return error if last tx failed.
+ Here we can't know on which packet error was, but it needs to know that error exists.
+ */
+ if (s->circular_buffer_error<0) {
+ int err=s->circular_buffer_error;
+ pthread_mutex_unlock(&s->mutex);
+ return err;
+ }
+
+ if(av_fifo_space(s->fifo) < size + 4) {
+ /* What about a partial packet tx ? */
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(ENOMEM);
+ }
+ AV_WL32(tmp, size);
+ av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+ av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ return size;
+ }
+#endif
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
ret = ff_network_wait_fd(s->udp_fd, 1);
if (ret < 0)
{
UDPContext *s = h->priv_data;
+#if HAVE_PTHREAD_CANCEL
+ // Request close once writing is finished
+ if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
+ pthread_mutex_lock(&s->mutex);
+ s->close_req = 1;
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ }
+#endif
+
if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
- closesocket(s->udp_fd);
#if HAVE_PTHREAD_CANCEL
if (s->thread_started) {
int ret;
- pthread_cancel(s->circular_buffer_thread);
+ // Cancel only read, as write has been signaled as success to the user
+ if (h->flags & AVIO_FLAG_READ)
+ pthread_cancel(s->circular_buffer_thread);
ret = pthread_join(s->circular_buffer_thread, NULL);
if (ret != 0)
av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
pthread_cond_destroy(&s->cond);
}
#endif
+ closesocket(s->udp_fd);
av_fifo_freep(&s->fifo);
return 0;
}