#define TCURL_MAX_LENGTH 512
#define FLASHVER_MAX_LENGTH 64
#define RTMP_PKTDATA_DEFAULT_SIZE 4096
+#define RTMP_HEADER 11
/** RTMP protocol handler state */
typedef enum {
typedef struct RTMPContext {
const AVClass *class;
URLContext* stream; ///< TCP stream used in interactions with RTMP server
- RTMPPacket prev_pkt[2][RTMP_CHANNELS]; ///< packet history used when reading and sending packets ([0] for reading, [1] for writing)
+ RTMPPacket *prev_pkt[2]; ///< packet history used when reading and sending packets ([0] for reading, [1] for writing)
+ int nb_prev_pkt[2]; ///< number of elements in prev_pkt
int in_chunk_size; ///< size of the chunks incoming RTMP packets are divided into
int out_chunk_size; ///< size of the chunks outgoing RTMP packets are divided into
int is_input; ///< input/output flag
uint32_t bytes_read; ///< number of bytes read from server
uint32_t last_bytes_read; ///< number of bytes read last reported to server
int skip_bytes; ///< number of bytes to skip from the input FLV stream in the next write call
- uint8_t flv_header[11]; ///< partial incoming flv packet header
+ uint8_t flv_header[RTMP_HEADER]; ///< partial incoming flv packet header
int flv_header_bytes; ///< number of initialized bytes in flv_header
int nb_invokes; ///< keeps track of invoke messages
char* tcurl; ///< url of the target stream
static int add_tracked_method(RTMPContext *rt, const char *name, int id)
{
- void *ptr;
+ int err;
if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) {
rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2;
- ptr = av_realloc(rt->tracked_methods,
- rt->tracked_methods_size * sizeof(*rt->tracked_methods));
- if (!ptr)
- return AVERROR(ENOMEM);
- rt->tracked_methods = ptr;
+ if ((err = av_reallocp(&rt->tracked_methods, rt->tracked_methods_size *
+ sizeof(*rt->tracked_methods))) < 0) {
+ rt->nb_tracked_methods = 0;
+ rt->tracked_methods_size = 0;
+ return err;
+ }
}
rt->tracked_methods[rt->nb_tracked_methods].name = av_strdup(name);
}
ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
fail:
ff_rtmp_packet_destroy(pkt);
return ret;
GetByteContext gbc;
if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size,
- rt->prev_pkt[0])) < 0)
+ &rt->prev_pkt[0], &rt->nb_prev_pkt[0])) < 0)
return ret;
cp = pkt.data;
bytestream2_init(&gbc, cp, pkt.size);
bytestream_put_be32(&p, rt->server_bw);
pkt.size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
bytestream_put_byte(&p, 2); // dynamic
pkt.size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
bytestream_put_be16(&p, 0); // 0 -> Stream Begin
bytestream_put_be32(&p, 0);
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->out_chunk_size);
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
pkt.size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
ff_amf_write_number(&p, 8192);
pkt.size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
return ret;
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ff_amf_write_number(&p, rt->live);
+ ff_amf_write_number(&p, rt->live * 1000);
return rtmp_send_packet(rt, &pkt, 1);
}
for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++)
tosend[i] = av_lfg_get(&rnd) >> 24;
- if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) {
+ if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* When the client wants to use RTMPE, we have to change the command
* byte to 0x06 which means to use encrypted data and we have to set
* the flash version to at least 9.0.115.0. */
if (ret < 0)
return ret;
- if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) {
+ if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Compute the shared secret key sent by the server and initialize
* the RC4 encryption. */
if ((ret = ff_rtmpe_compute_secret_key(rt->stream, serverdata + 1,
if (ret < 0)
return ret;
- if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) {
+ if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Encrypt the signature to be send to the server. */
ff_rtmpe_encrypt_sig(rt->stream, tosend +
RTMP_HANDSHAKE_PACKET_SIZE - 32, digest,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0)
return ret;
- if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) {
+ if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Set RC4 keys for encryption and update the keystreams. */
if ((ret = ff_rtmpe_update_keystream(rt->stream)) < 0)
return ret;
}
} else {
- if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) {
+ if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Compute the shared secret key sent by the server and initialize
* the RC4 encryption. */
if ((ret = ff_rtmpe_compute_secret_key(rt->stream, serverdata + 1,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0)
return ret;
- if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) {
+ if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Set RC4 keys for encryption and update the keystreams. */
if ((ret = ff_rtmpe_update_keystream(rt->stream)) < 0)
return ret;
/* Send the same chunk size change packet back to the server,
* setting the outgoing chunk size to the same as the incoming one. */
if ((ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
- rt->prev_pkt[1])) < 0)
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1])) < 0)
return ret;
rt->out_chunk_size = AV_RB32(pkt->data);
}
bytestream2_put_be32(&pbc, rt->nb_streamid);
ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&spkt);
spkt.size = pp - spkt.data;
ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&spkt);
return ret;
}
spkt.size = pp - spkt.data;
ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
- rt->prev_pkt[1]);
+ &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
ff_rtmp_packet_destroy(&spkt);
return ret;
}
{
RTMPContext *rt = s->priv_data;
const uint8_t *data_end = pkt->data + pkt->size;
- const uint8_t *ptr = pkt->data + 11;
+ const uint8_t *ptr = pkt->data + RTMP_HEADER;
uint8_t tmpstr[256];
int i, t;
t = ff_amf_get_field_value(ptr, data_end, "level", tmpstr, sizeof(tmpstr));
if (!t && !strcmp(tmpstr, "error")) {
- if (!ff_amf_get_field_value(ptr, data_end,
- "description", tmpstr, sizeof(tmpstr)))
+ t = ff_amf_get_field_value(ptr, data_end,
+ "description", tmpstr, sizeof(tmpstr));
+ if (t || !tmpstr[0])
+ t = ff_amf_get_field_value(ptr, data_end, "code",
+ tmpstr, sizeof(tmpstr));
+ if (!t)
av_log(s, AV_LOG_ERROR, "Server error: %s\n", tmpstr);
return -1;
}
return ret;
}
-static int handle_notify(URLContext *s, RTMPPacket *pkt) {
- RTMPContext *rt = s->priv_data;
- const uint8_t *p = NULL;
- uint8_t *cp = NULL;
- uint8_t commandbuffer[64];
- char statusmsg[128];
- int stringlen;
- GetByteContext gbc;
- PutByteContext pbc;
- uint32_t ts;
+static int update_offset(RTMPContext *rt, int size)
+{
int old_flv_size;
- const uint8_t *datatowrite;
- unsigned datatowritelength;
-
- p = pkt->data;
- bytestream2_init(&gbc, p, pkt->size);
- if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),
- &stringlen))
- return AVERROR_INVALIDDATA;
- if (!strcmp(commandbuffer, "@setDataFrame")) {
- datatowrite = gbc.buffer;
- datatowritelength = bytestream2_get_bytes_left(&gbc);
- if (ff_amf_read_string(&gbc, statusmsg,
- sizeof(statusmsg), &stringlen))
- return AVERROR_INVALIDDATA;
- } else {
- datatowrite = pkt->data;
- datatowritelength = pkt->size;
- }
-
- /* Provide ECMAArray to flv */
- ts = pkt->timestamp;
// generate packet header and put data into buffer for FLV demuxer
if (rt->flv_off < rt->flv_size) {
+ // There is old unread data in the buffer, thus append at the end
old_flv_size = rt->flv_size;
- rt->flv_size += datatowritelength + 15;
+ rt->flv_size += size;
} else {
+ // All data has been read, write the new data at the start of the buffer
old_flv_size = 0;
- rt->flv_size = datatowritelength + 15;
+ rt->flv_size = size;
rt->flv_off = 0;
}
- cp = av_realloc(rt->flv_data, rt->flv_size);
- if (!cp)
- return AVERROR(ENOMEM);
- rt->flv_data = cp;
- bytestream2_init_writer(&pbc, cp, rt->flv_size);
+ return old_flv_size;
+}
+
+static int append_flv_data(RTMPContext *rt, RTMPPacket *pkt, int skip)
+{
+ int old_flv_size, ret;
+ PutByteContext pbc;
+ const uint8_t *data = pkt->data + skip;
+ const int size = pkt->size - skip;
+ uint32_t ts = pkt->timestamp;
+
+ old_flv_size = update_offset(rt, size + 15);
+
+ if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) {
+ rt->flv_size = rt->flv_off = 0;
+ return ret;
+ }
+ bytestream2_init_writer(&pbc, rt->flv_data, rt->flv_size);
bytestream2_skip_p(&pbc, old_flv_size);
bytestream2_put_byte(&pbc, pkt->type);
- bytestream2_put_be24(&pbc, datatowritelength);
+ bytestream2_put_be24(&pbc, size);
bytestream2_put_be24(&pbc, ts);
bytestream2_put_byte(&pbc, ts >> 24);
bytestream2_put_be24(&pbc, 0);
- bytestream2_put_buffer(&pbc, datatowrite, datatowritelength);
+ bytestream2_put_buffer(&pbc, data, size);
bytestream2_put_be32(&pbc, 0);
return 0;
}
+static int handle_notify(URLContext *s, RTMPPacket *pkt)
+{
+ RTMPContext *rt = s->priv_data;
+ uint8_t commandbuffer[64];
+ char statusmsg[128];
+ int stringlen, ret, skip = 0;
+ GetByteContext gbc;
+
+ bytestream2_init(&gbc, pkt->data, pkt->size);
+ if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),
+ &stringlen))
+ return AVERROR_INVALIDDATA;
+
+ // Skip the @setDataFrame string and validate it is a notification
+ if (!strcmp(commandbuffer, "@setDataFrame")) {
+ skip = gbc.buffer - pkt->data;
+ ret = ff_amf_read_string(&gbc, statusmsg,
+ sizeof(statusmsg), &stringlen);
+ if (ret < 0)
+ return AVERROR_INVALIDDATA;
+ }
+
+ return append_flv_data(rt, pkt, skip);
+}
+
/**
* Parse received packet and possibly perform some action depending on
* the packet contents.
return 0;
}
+static int handle_metadata(RTMPContext *rt, RTMPPacket *pkt)
+{
+ int ret, old_flv_size, type;
+ const uint8_t *next;
+ uint8_t *p;
+ uint32_t size;
+ uint32_t ts, cts, pts = 0;
+
+ old_flv_size = update_offset(rt, pkt->size);
+
+ if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) {
+ rt->flv_size = rt->flv_off = 0;
+ return ret;
+ }
+
+ next = pkt->data;
+ p = rt->flv_data + old_flv_size;
+
+ /* copy data while rewriting timestamps */
+ ts = pkt->timestamp;
+
+ while (next - pkt->data < pkt->size - RTMP_HEADER) {
+ type = bytestream_get_byte(&next);
+ size = bytestream_get_be24(&next);
+ cts = bytestream_get_be24(&next);
+ cts |= bytestream_get_byte(&next) << 24;
+ if (!pts)
+ pts = cts;
+ ts += cts - pts;
+ pts = cts;
+ if (size + 3 + 4 > pkt->data + pkt->size - next)
+ break;
+ bytestream_put_byte(&p, type);
+ bytestream_put_be24(&p, size);
+ bytestream_put_be24(&p, ts);
+ bytestream_put_byte(&p, ts >> 24);
+ memcpy(p, next, size + 3 + 4);
+ next += size + 3 + 4;
+ p += size + 3 + 4;
+ }
+ if (p != rt->flv_data + rt->flv_size) {
+ av_log(NULL, AV_LOG_WARNING, "Incomplete flv packets in "
+ "RTMP_PT_METADATA packet\n");
+ rt->flv_size = p - rt->flv_data;
+ }
+
+ return 0;
+}
+
/**
* Interact with the server by receiving and sending RTMP packets until
* there is some significant data (media data or expected status notification).
{
RTMPContext *rt = s->priv_data;
int ret;
- uint8_t *p;
- const uint8_t *next;
- uint32_t size;
- uint32_t ts, cts, pts=0;
if (rt->state == STATE_STOPPED)
return AVERROR_EOF;
for (;;) {
RTMPPacket rpkt = { 0 };
if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
- rt->in_chunk_size, rt->prev_pkt[0])) <= 0) {
+ rt->in_chunk_size, &rt->prev_pkt[0],
+ &rt->nb_prev_pkt[0])) <= 0) {
if (ret == 0) {
return AVERROR(EAGAIN);
} else {
ff_rtmp_packet_destroy(&rpkt);
continue;
}
- if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO ||
- (rpkt.type == RTMP_PT_NOTIFY &&
- ff_amf_match_string(rpkt.data, rpkt.size, "onMetaData"))) {
- ts = rpkt.timestamp;
-
- // generate packet header and put data into buffer for FLV demuxer
- rt->flv_off = 0;
- rt->flv_size = rpkt.size + 15;
- rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size);
- bytestream_put_byte(&p, rpkt.type);
- bytestream_put_be24(&p, rpkt.size);
- bytestream_put_be24(&p, ts);
- bytestream_put_byte(&p, ts >> 24);
- bytestream_put_be24(&p, 0);
- bytestream_put_buffer(&p, rpkt.data, rpkt.size);
- bytestream_put_be32(&p, 0);
+ if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO) {
+ ret = append_flv_data(rt, &rpkt, 0);
ff_rtmp_packet_destroy(&rpkt);
- return 0;
+ return ret;
} else if (rpkt.type == RTMP_PT_NOTIFY) {
ret = handle_notify(s, &rpkt);
ff_rtmp_packet_destroy(&rpkt);
- if (ret) {
- av_log(s, AV_LOG_ERROR, "Handle notify error\n");
- return ret;
- }
- return 0;
+ return ret;
} else if (rpkt.type == RTMP_PT_METADATA) {
- // we got raw FLV data, make it available for FLV demuxer
- rt->flv_off = 0;
- rt->flv_size = rpkt.size;
- rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
- /* rewrite timestamps */
- next = rpkt.data;
- ts = rpkt.timestamp;
- while (next - rpkt.data < rpkt.size - 11) {
- next++;
- size = bytestream_get_be24(&next);
- p=next;
- cts = bytestream_get_be24(&next);
- cts |= bytestream_get_byte(&next) << 24;
- if (pts==0)
- pts=cts;
- ts += cts - pts;
- pts = cts;
- bytestream_put_be24(&p, ts);
- bytestream_put_byte(&p, ts >> 24);
- next += size + 3 + 4;
- }
- memcpy(rt->flv_data, rpkt.data, rpkt.size);
+ ret = handle_metadata(rt, &rpkt);
ff_rtmp_packet_destroy(&rpkt);
return 0;
}
}
if (rt->state > STATE_HANDSHAKED)
ret = gen_delete_stream(h, rt);
- for (i = 0; i < 2; i++)
- for (j = 0; j < RTMP_CHANNELS; j++)
+ for (i = 0; i < 2; i++) {
+ for (j = 0; j < rt->nb_prev_pkt[i]; j++)
ff_rtmp_packet_destroy(&rt->prev_pkt[i][j]);
+ av_freep(&rt->prev_pkt[i]);
+ }
free_tracked_methods(rt);
av_freep(&rt->flv_data);
(!strcmp(fname + len - 4, ".f4v") ||
!strcmp(fname + len - 4, ".mp4"))) {
memcpy(rt->playpath, "mp4:", 5);
- } else if (len >= 4 && !strcmp(fname + len - 4, ".flv")) {
- fname[len - 4] = '\0';
} else {
+ if (len >= 4 && !strcmp(fname + len - 4, ".flv"))
+ fname[len - 4] = '\0';
rt->playpath[0] = 0;
}
av_strlcat(rt->playpath, fname, PLAYPATH_MAX_LENGTH);
goto fail;
if (rt->do_reconnect) {
+ int i;
ffurl_close(rt->stream);
rt->stream = NULL;
rt->do_reconnect = 0;
rt->nb_invokes = 0;
- memset(rt->prev_pkt, 0, sizeof(rt->prev_pkt));
+ for (i = 0; i < 2; i++)
+ memset(rt->prev_pkt[i], 0,
+ sizeof(**rt->prev_pkt) * rt->nb_prev_pkt[i]);
free_tracked_methods(rt);
goto reconnect;
}
if (rt->is_input) {
+ int err;
// generate FLV header for demuxer
rt->flv_size = 13;
- rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
+ if ((err = av_reallocp(&rt->flv_data, rt->flv_size)) < 0)
+ return err;
rt->flv_off = 0;
memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
} else {
continue;
}
- if (rt->flv_header_bytes < 11) {
+ if (rt->flv_header_bytes < RTMP_HEADER) {
const uint8_t *header = rt->flv_header;
- int copy = FFMIN(11 - rt->flv_header_bytes, size_temp);
+ int copy = FFMIN(RTMP_HEADER - rt->flv_header_bytes, size_temp);
int channel = RTMP_AUDIO_CHANNEL;
bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy);
rt->flv_header_bytes += copy;
size_temp -= copy;
- if (rt->flv_header_bytes < 11)
+ if (rt->flv_header_bytes < RTMP_HEADER)
break;
pkttype = bytestream_get_byte(&header);
pkttype == RTMP_PT_NOTIFY) {
if (pkttype == RTMP_PT_NOTIFY)
pktsize += 16;
+ if ((ret = ff_rtmp_check_alloc_array(&rt->prev_pkt[1],
+ &rt->nb_prev_pkt[1],
+ channel)) < 0)
+ return ret;
rt->prev_pkt[1][channel].channel_id = 0;
}
if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt,
rt->in_chunk_size,
- rt->prev_pkt[0], c)) <= 0)
+ &rt->prev_pkt[0],
+ &rt->nb_prev_pkt[0], c)) <= 0)
return ret;
if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0)
{"rtmp_swfverify", "URL to player swf file, compute hash/size automatically.", OFFSET(swfverify), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_tcurl", "URL of the target stream. Defaults to proto://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
+ {"listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
{"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1", OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
{ NULL },
};