X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=libavformat%2Frtmpproto.c;h=e75a5190d02459334b166098f8fa508042623a06;hb=a67b67944aa9e6e794934d15f9fd9a9cf7173e09;hp=e9814a3a28bb05998a47c9e6a166423719fb432d;hpb=0d3784396b736374a61fea26268febdabd803a59;p=ffmpeg diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index e9814a3a28b..e75a5190d02 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -53,6 +53,7 @@ #define TCURL_MAX_LENGTH 512 #define FLASHVER_MAX_LENGTH 64 #define RTMP_PKTDATA_DEFAULT_SIZE 4096 +#define RTMP_HEADER 11 /** RTMP protocol handler state */ typedef enum { @@ -76,7 +77,8 @@ typedef struct TrackedMethod { typedef struct RTMPContext { const AVClass *class; URLContext* stream; ///< TCP stream used in interactions with RTMP server - RTMPPacket prev_pkt[2][RTMP_CHANNELS]; ///< packet history used when reading and sending packets + RTMPPacket *prev_pkt[2]; ///< packet history used when reading and sending packets ([0] for reading, [1] for writing) + int nb_prev_pkt[2]; ///< number of elements in prev_pkt int in_chunk_size; ///< size of the chunks incoming RTMP packets are divided into int out_chunk_size; ///< size of the chunks outgoing RTMP packets are divided into int is_input; ///< input/output flag @@ -94,8 +96,12 @@ typedef struct RTMPContext { uint32_t client_report_size; ///< number of bytes after which client should report to server uint32_t bytes_read; ///< number of bytes read from server uint32_t last_bytes_read; ///< number of bytes read last reported to server + uint32_t last_timestamp; ///< last timestamp received in a packet int skip_bytes; ///< number of bytes to skip from the input FLV stream in the next write call - uint8_t flv_header[11]; ///< partial incoming flv packet header + int has_audio; ///< presence of audio data + int has_video; ///< presence of video data + int received_metadata; ///< Indicates if we have received metadata about the streams + uint8_t flv_header[RTMP_HEADER]; ///< partial incoming flv packet header int flv_header_bytes; ///< number of initialized bytes in flv_header int nb_invokes; ///< keeps track of invoke messages char* tcurl; ///< url of the target stream @@ -118,6 +124,7 @@ typedef struct RTMPContext { int listen; ///< listen mode flag int listen_timeout; ///< listen timeout to wait for new connections int nb_streamid; ///< The next stream id to return on createStream calls + double duration; ///< Duration of the stream in seconds as returned by the server (only valid if non-zero) char username[50]; char password[50]; char auth_params[500]; @@ -148,17 +155,20 @@ static const uint8_t rtmp_server_key[] = { 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE }; +static int handle_chunk_size(URLContext *s, RTMPPacket *pkt); + static int add_tracked_method(RTMPContext *rt, const char *name, int id) { - void *ptr; + int err; if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) { rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2; - ptr = av_realloc(rt->tracked_methods, - rt->tracked_methods_size * sizeof(*rt->tracked_methods)); - if (!ptr) - return AVERROR(ENOMEM); - rt->tracked_methods = ptr; + if ((err = av_reallocp(&rt->tracked_methods, rt->tracked_methods_size * + sizeof(*rt->tracked_methods))) < 0) { + rt->nb_tracked_methods = 0; + rt->tracked_methods_size = 0; + return err; + } } rt->tracked_methods[rt->nb_tracked_methods].name = av_strdup(name); @@ -236,7 +246,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track) } ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); fail: ff_rtmp_packet_destroy(pkt); return ret; @@ -266,9 +276,6 @@ static int rtmp_write_amf_data(URLContext *s, char *param, uint8_t **p) *value = '\0'; value++; - if (!field || !value) - goto fail; - ff_amf_write_field_name(p, field); } else { goto fail; @@ -367,7 +374,7 @@ static int gen_connect(URLContext *s, RTMPContext *rt) char *param = rt->conn; // Write arbitrary AMF data to the Connect message. - while (param != NULL) { + while (param) { char *sep; param += strspn(param, " "); if (!*param) @@ -406,8 +413,18 @@ static int read_connect(URLContext *s, RTMPContext *rt) GetByteContext gbc; if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size, - rt->prev_pkt[1])) < 0) + &rt->prev_pkt[0], &rt->nb_prev_pkt[0])) < 0) return ret; + + if (pkt.type == RTMP_PT_CHUNK_SIZE) { + if ((ret = handle_chunk_size(s, &pkt)) < 0) + return ret; + ff_rtmp_packet_destroy(&pkt); + if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size, + &rt->prev_pkt[0], &rt->nb_prev_pkt[0])) < 0) + return ret; + } + cp = pkt.data; bytestream2_init(&gbc, cp, pkt.size); if (ff_amf_read_string(&gbc, command, sizeof(command), &stringlen)) { @@ -442,7 +459,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_be32(&p, rt->server_bw); pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -455,7 +472,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_byte(&p, 2); // dynamic pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -469,7 +486,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_be16(&p, 0); // 0 -> Stream Begin bytestream_put_be32(&p, 0); ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -482,12 +499,12 @@ static int read_connect(URLContext *s, RTMPContext *rt) p = pkt.data; bytestream_put_be32(&p, rt->out_chunk_size); ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; - // Send result_ NetConnection.Connect.Success to connect + // Send _result NetConnection.Connect.Success to connect if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, RTMP_PKTDATA_DEFAULT_SIZE)) < 0) @@ -517,7 +534,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -532,7 +549,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) ff_amf_write_number(&p, 8192); pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); return ret; @@ -660,6 +677,30 @@ static int gen_delete_stream(URLContext *s, RTMPContext *rt) return rtmp_send_packet(rt, &pkt, 0); } +/** + * Generate 'getStreamLength' call and send it to the server. If the server + * knows the duration of the selected stream, it will reply with the duration + * in seconds. + */ +static int gen_get_stream_length(URLContext *s, RTMPContext *rt) +{ + RTMPPacket pkt; + uint8_t *p; + int ret; + + if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, + 0, 31 + strlen(rt->playpath))) < 0) + return ret; + + p = pkt.data; + ff_amf_write_string(&p, "getStreamLength"); + ff_amf_write_number(&p, ++rt->nb_invokes); + ff_amf_write_null(&p); + ff_amf_write_string(&p, rt->playpath); + + return rtmp_send_packet(rt, &pkt, 1); +} + /** * Generate client buffer time and send it to the server. */ @@ -704,7 +745,7 @@ static int gen_play(URLContext *s, RTMPContext *rt) ff_amf_write_number(&p, ++rt->nb_invokes); ff_amf_write_null(&p); ff_amf_write_string(&p, rt->playpath); - ff_amf_write_number(&p, rt->live); + ff_amf_write_number(&p, rt->live * 1000); return rtmp_send_packet(rt, &pkt, 1); } @@ -732,6 +773,33 @@ static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp) return rtmp_send_packet(rt, &pkt, 1); } +/** + * Generate a pause packet that either pauses or unpauses the current stream. + */ +static int gen_pause(URLContext *s, RTMPContext *rt, int pause, uint32_t timestamp) +{ + RTMPPacket pkt; + uint8_t *p; + int ret; + + av_log(s, AV_LOG_DEBUG, "Sending pause command for timestamp %d\n", + timestamp); + + if ((ret = ff_rtmp_packet_create(&pkt, 3, RTMP_PT_INVOKE, 0, 29)) < 0) + return ret; + + pkt.extra = rt->stream_id; + + p = pkt.data; + ff_amf_write_string(&p, "pause"); + ff_amf_write_number(&p, 0); //no tracking back responses + ff_amf_write_null(&p); //as usual, the first null param + ff_amf_write_bool(&p, pause); // pause or unpause + ff_amf_write_number(&p, timestamp); //where we pause the stream + + return rtmp_send_packet(rt, &pkt, 1); +} + /** * Generate 'publish' call and send it to the server. */ @@ -1168,7 +1236,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++) tosend[i] = av_lfg_get(&rnd) >> 24; - if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) { + if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) { /* When the client wants to use RTMPE, we have to change the command * byte to 0x06 which means to use encrypted data and we have to set * the flash version to at least 9.0.115.0. */ @@ -1246,7 +1314,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) if (ret < 0) return ret; - if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) { + if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) { /* Compute the shared secret key sent by the server and initialize * the RC4 encryption. */ if ((ret = ff_rtmpe_compute_secret_key(rt->stream, serverdata + 1, @@ -1276,7 +1344,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) if (ret < 0) return ret; - if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) { + if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) { /* Encrypt the signature to be send to the server. */ ff_rtmpe_encrypt_sig(rt->stream, tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32, digest, @@ -1288,13 +1356,13 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) RTMP_HANDSHAKE_PACKET_SIZE)) < 0) return ret; - if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) { + if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) { /* Set RC4 keys for encryption and update the keystreams. */ if ((ret = ff_rtmpe_update_keystream(rt->stream)) < 0) return ret; } } else { - if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) { + if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) { /* Compute the shared secret key sent by the server and initialize * the RC4 encryption. */ if ((ret = ff_rtmpe_compute_secret_key(rt->stream, serverdata + 1, @@ -1312,7 +1380,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) RTMP_HANDSHAKE_PACKET_SIZE)) < 0) return ret; - if (rt->encrypted && CONFIG_FFRTMPCRYPT_PROTOCOL) { + if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) { /* Set RC4 keys for encryption and update the keystreams. */ if ((ret = ff_rtmpe_update_keystream(rt->stream)) < 0) return ret; @@ -1453,7 +1521,7 @@ static int handle_chunk_size(URLContext *s, RTMPPacket *pkt) /* Send the same chunk size change packet back to the server, * setting the outgoing chunk size to the same as the incoming one. */ if ((ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, - rt->prev_pkt[1])) < 0) + &rt->prev_pkt[1], &rt->nb_prev_pkt[1])) < 0) return ret; rt->out_chunk_size = AV_RB32(pkt->data); } @@ -1748,6 +1816,9 @@ static int handle_invoke_error(URLContext *s, RTMPPacket *pkt) /* Gracefully ignore Adobe-specific historical artifact errors. */ level = AV_LOG_WARNING; ret = 0; + } else if (tracked_method && !strcmp(tracked_method, "getStreamLength")) { + level = rt->live ? AV_LOG_DEBUG : AV_LOG_WARNING; + ret = 0; } else if (tracked_method && !strcmp(tracked_method, "connect")) { ret = handle_connect_error(s, tmpstr); if (!ret) { @@ -1782,7 +1853,7 @@ static int write_begin(URLContext *s) bytestream2_put_be32(&pbc, rt->nb_streamid); ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); @@ -1829,7 +1900,7 @@ static int write_status(URLContext *s, RTMPPacket *pkt, spkt.size = pp - spkt.data; ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); return ret; @@ -1930,11 +2001,50 @@ static int send_invoke_response(URLContext *s, RTMPPacket *pkt) } spkt.size = pp - spkt.data; ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); return ret; } +/** + * Read the AMF_NUMBER response ("_result") to a function call + * (e.g. createStream()). This response should be made up of the AMF_STRING + * "result", a NULL object and then the response encoded as AMF_NUMBER. On a + * successful response, we will return set the value to number (otherwise number + * will not be changed). + * + * @return 0 if reading the value succeeds, negative value otherwiss + */ +static int read_number_result(RTMPPacket *pkt, double *number) +{ + // We only need to fit "_result" in this. + uint8_t strbuffer[8]; + int stringlen; + double numbuffer; + GetByteContext gbc; + + bytestream2_init(&gbc, pkt->data, pkt->size); + + // Value 1/4: "_result" as AMF_STRING + if (ff_amf_read_string(&gbc, strbuffer, sizeof(strbuffer), &stringlen)) + return AVERROR_INVALIDDATA; + if (strcmp(strbuffer, "_result")) + return AVERROR_INVALIDDATA; + // Value 2/4: The callee reference number + if (ff_amf_read_number(&gbc, &numbuffer)) + return AVERROR_INVALIDDATA; + // Value 3/4: Null + if (ff_amf_read_null(&gbc)) + return AVERROR_INVALIDDATA; + // Value 4/4: The resonse as AMF_NUMBER + if (ff_amf_read_number(&gbc, &numbuffer)) + return AVERROR_INVALIDDATA; + else + *number = numbuffer; + + return 0; +} + static int handle_invoke_result(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; @@ -1976,22 +2086,30 @@ static int handle_invoke_result(URLContext *s, RTMPPacket *pkt) } } } else if (!strcmp(tracked_method, "createStream")) { - //extract a number from the result - if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) { + double stream_id; + if (read_number_result(pkt, &stream_id)) { av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n"); } else { - rt->stream_id = av_int2double(AV_RB64(pkt->data + 21)); + rt->stream_id = stream_id; } if (!rt->is_input) { if ((ret = gen_publish(s, rt)) < 0) goto fail; } else { + if (rt->live != -1) { + if ((ret = gen_get_stream_length(s, rt)) < 0) + goto fail; + } if ((ret = gen_play(s, rt)) < 0) goto fail; if ((ret = gen_buffer_time(s, rt)) < 0) goto fail; } + } else if (!strcmp(tracked_method, "getStreamLength")) { + if (read_number_result(pkt, &rt->duration)) { + av_log(s, AV_LOG_WARNING, "Unexpected reply on getStreamLength()\n"); + } } fail: @@ -2003,7 +2121,7 @@ static int handle_invoke_status(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; const uint8_t *data_end = pkt->data + pkt->size; - const uint8_t *ptr = pkt->data + 11; + const uint8_t *ptr = pkt->data + RTMP_HEADER; uint8_t tmpstr[256]; int i, t; @@ -2016,8 +2134,12 @@ static int handle_invoke_status(URLContext *s, RTMPPacket *pkt) t = ff_amf_get_field_value(ptr, data_end, "level", tmpstr, sizeof(tmpstr)); if (!t && !strcmp(tmpstr, "error")) { - if (!ff_amf_get_field_value(ptr, data_end, - "description", tmpstr, sizeof(tmpstr))) + t = ff_amf_get_field_value(ptr, data_end, + "description", tmpstr, sizeof(tmpstr)); + if (t || !tmpstr[0]) + t = ff_amf_get_field_value(ptr, data_end, "code", + tmpstr, sizeof(tmpstr)); + if (!t) av_log(s, AV_LOG_ERROR, "Server error: %s\n", tmpstr); return -1; } @@ -2063,66 +2185,115 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt) return ret; } -static int handle_notify(URLContext *s, RTMPPacket *pkt) { - RTMPContext *rt = s->priv_data; - const uint8_t *p = NULL; - uint8_t *cp = NULL; - uint8_t commandbuffer[64]; - char statusmsg[128]; - int stringlen; - GetByteContext gbc; - PutByteContext pbc; - uint32_t ts; +static int update_offset(RTMPContext *rt, int size) +{ int old_flv_size; - const uint8_t *datatowrite; - unsigned datatowritelength; - - p = pkt->data; - bytestream2_init(&gbc, p, pkt->size); - if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer), - &stringlen)) - return AVERROR_INVALIDDATA; - if (!strcmp(commandbuffer, "@setDataFrame")) { - datatowrite = gbc.buffer; - datatowritelength = bytestream2_get_bytes_left(&gbc); - if (ff_amf_read_string(&gbc, statusmsg, - sizeof(statusmsg), &stringlen)) - return AVERROR_INVALIDDATA; - } else { - datatowrite = pkt->data; - datatowritelength = pkt->size; - } - - /* Provide ECMAArray to flv */ - ts = pkt->timestamp; // generate packet header and put data into buffer for FLV demuxer if (rt->flv_off < rt->flv_size) { + // There is old unread data in the buffer, thus append at the end old_flv_size = rt->flv_size; - rt->flv_size += datatowritelength + 15; + rt->flv_size += size; } else { + // All data has been read, write the new data at the start of the buffer old_flv_size = 0; - rt->flv_size = datatowritelength + 15; + rt->flv_size = size; rt->flv_off = 0; } - cp = av_realloc(rt->flv_data, rt->flv_size); - if (!cp) - return AVERROR(ENOMEM); - rt->flv_data = cp; - bytestream2_init_writer(&pbc, cp, rt->flv_size); + return old_flv_size; +} + +static int append_flv_data(RTMPContext *rt, RTMPPacket *pkt, int skip) +{ + int old_flv_size, ret; + PutByteContext pbc; + const uint8_t *data = pkt->data + skip; + const int size = pkt->size - skip; + uint32_t ts = pkt->timestamp; + + if (pkt->type == RTMP_PT_AUDIO) { + rt->has_audio = 1; + } else if (pkt->type == RTMP_PT_VIDEO) { + rt->has_video = 1; + } + + old_flv_size = update_offset(rt, size + 15); + + if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) { + rt->flv_size = rt->flv_off = 0; + return ret; + } + bytestream2_init_writer(&pbc, rt->flv_data, rt->flv_size); bytestream2_skip_p(&pbc, old_flv_size); bytestream2_put_byte(&pbc, pkt->type); - bytestream2_put_be24(&pbc, datatowritelength); + bytestream2_put_be24(&pbc, size); bytestream2_put_be24(&pbc, ts); bytestream2_put_byte(&pbc, ts >> 24); bytestream2_put_be24(&pbc, 0); - bytestream2_put_buffer(&pbc, datatowrite, datatowritelength); + bytestream2_put_buffer(&pbc, data, size); bytestream2_put_be32(&pbc, 0); return 0; } +static int handle_notify(URLContext *s, RTMPPacket *pkt) +{ + RTMPContext *rt = s->priv_data; + uint8_t commandbuffer[64]; + char statusmsg[128]; + int stringlen, ret, skip = 0; + GetByteContext gbc; + + bytestream2_init(&gbc, pkt->data, pkt->size); + if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer), + &stringlen)) + return AVERROR_INVALIDDATA; + + if (!strcmp(commandbuffer, "onMetaData")) { + // metadata properties should be stored in a mixed array + if (bytestream2_get_byte(&gbc) == AMF_DATA_TYPE_MIXEDARRAY) { + // We have found a metaData Array so flv can determine the streams + // from this. + rt->received_metadata = 1; + // skip 32-bit max array index + bytestream2_skip(&gbc, 4); + while (bytestream2_get_bytes_left(&gbc) > 3) { + if (ff_amf_get_string(&gbc, statusmsg, sizeof(statusmsg), + &stringlen)) + return AVERROR_INVALIDDATA; + // We do not care about the content of the property (yet). + stringlen = ff_amf_tag_size(gbc.buffer, gbc.buffer_end); + if (stringlen < 0) + return AVERROR_INVALIDDATA; + bytestream2_skip(&gbc, stringlen); + + // The presence of the following properties indicates that the + // respective streams are present. + if (!strcmp(statusmsg, "videocodecid")) { + rt->has_video = 1; + } + if (!strcmp(statusmsg, "audiocodecid")) { + rt->has_audio = 1; + } + } + if (bytestream2_get_be24(&gbc) != AMF_END_OF_OBJECT) + return AVERROR_INVALIDDATA; + } + } + + // Skip the @setDataFrame string and validate it is a notification + if (!strcmp(commandbuffer, "@setDataFrame")) { + skip = gbc.buffer - pkt->data; + ret = ff_amf_read_string(&gbc, statusmsg, + sizeof(statusmsg), &stringlen); + if (ret < 0) + return AVERROR_INVALIDDATA; + } + + return append_flv_data(rt, pkt, skip); +} + /** * Parse received packet and possibly perform some action depending on * the packet contents. @@ -2139,7 +2310,7 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) switch (pkt->type) { case RTMP_PT_BYTES_READ: - av_dlog(s, "received bytes read report\n"); + av_log(s, AV_LOG_TRACE, "received bytes read report\n"); break; case RTMP_PT_CHUNK_SIZE: if ((ret = handle_chunk_size(s, pkt)) < 0) @@ -2174,6 +2345,55 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) return 0; } +static int handle_metadata(RTMPContext *rt, RTMPPacket *pkt) +{ + int ret, old_flv_size, type; + const uint8_t *next; + uint8_t *p; + uint32_t size; + uint32_t ts, cts, pts = 0; + + old_flv_size = update_offset(rt, pkt->size); + + if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) { + rt->flv_size = rt->flv_off = 0; + return ret; + } + + next = pkt->data; + p = rt->flv_data + old_flv_size; + + /* copy data while rewriting timestamps */ + ts = pkt->timestamp; + + while (next - pkt->data < pkt->size - RTMP_HEADER) { + type = bytestream_get_byte(&next); + size = bytestream_get_be24(&next); + cts = bytestream_get_be24(&next); + cts |= bytestream_get_byte(&next) << 24; + if (!pts) + pts = cts; + ts += cts - pts; + pts = cts; + if (size + 3 + 4 > pkt->data + pkt->size - next) + break; + bytestream_put_byte(&p, type); + bytestream_put_be24(&p, size); + bytestream_put_be24(&p, ts); + bytestream_put_byte(&p, ts >> 24); + memcpy(p, next, size + 3 + 4); + next += size + 3 + 4; + p += size + 3 + 4; + } + if (p != rt->flv_data + rt->flv_size) { + av_log(NULL, AV_LOG_WARNING, "Incomplete flv packets in " + "RTMP_PT_METADATA packet\n"); + rt->flv_size = p - rt->flv_data; + } + + return 0; +} + /** * Interact with the server by receiving and sending RTMP packets until * there is some significant data (media data or expected status notification). @@ -2189,10 +2409,6 @@ static int get_packet(URLContext *s, int for_header) { RTMPContext *rt = s->priv_data; int ret; - uint8_t *p; - const uint8_t *next; - uint32_t size; - uint32_t ts, cts, pts=0; if (rt->state == STATE_STOPPED) return AVERROR_EOF; @@ -2200,13 +2416,18 @@ static int get_packet(URLContext *s, int for_header) for (;;) { RTMPPacket rpkt = { 0 }; if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt, - rt->in_chunk_size, rt->prev_pkt[0])) <= 0) { + rt->in_chunk_size, &rt->prev_pkt[0], + &rt->nb_prev_pkt[0])) <= 0) { if (ret == 0) { return AVERROR(EAGAIN); } else { return AVERROR(EIO); } } + + // Track timestamp for later use + rt->last_timestamp = rpkt.timestamp; + rt->bytes_read += ret; if (rt->bytes_read > rt->last_bytes_read + rt->client_report_size) { av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n"); @@ -2250,55 +2471,16 @@ static int get_packet(URLContext *s, int for_header) ff_rtmp_packet_destroy(&rpkt); continue; } - if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO || - (rpkt.type == RTMP_PT_NOTIFY && - ff_amf_match_string(rpkt.data, rpkt.size, "onMetaData"))) { - ts = rpkt.timestamp; - - // generate packet header and put data into buffer for FLV demuxer - rt->flv_off = 0; - rt->flv_size = rpkt.size + 15; - rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size); - bytestream_put_byte(&p, rpkt.type); - bytestream_put_be24(&p, rpkt.size); - bytestream_put_be24(&p, ts); - bytestream_put_byte(&p, ts >> 24); - bytestream_put_be24(&p, 0); - bytestream_put_buffer(&p, rpkt.data, rpkt.size); - bytestream_put_be32(&p, 0); + if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO) { + ret = append_flv_data(rt, &rpkt, 0); ff_rtmp_packet_destroy(&rpkt); - return 0; + return ret; } else if (rpkt.type == RTMP_PT_NOTIFY) { ret = handle_notify(s, &rpkt); ff_rtmp_packet_destroy(&rpkt); - if (ret) { - av_log(s, AV_LOG_ERROR, "Handle notify error\n"); - return ret; - } - return 0; + return ret; } else if (rpkt.type == RTMP_PT_METADATA) { - // we got raw FLV data, make it available for FLV demuxer - rt->flv_off = 0; - rt->flv_size = rpkt.size; - rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); - /* rewrite timestamps */ - next = rpkt.data; - ts = rpkt.timestamp; - while (next - rpkt.data < rpkt.size - 11) { - next++; - size = bytestream_get_be24(&next); - p=next; - cts = bytestream_get_be24(&next); - cts |= bytestream_get_byte(&next) << 24; - if (pts==0) - pts=cts; - ts += cts - pts; - pts = cts; - bytestream_put_be24(&p, ts); - bytestream_put_byte(&p, ts >> 24); - next += size + 3 + 4; - } - memcpy(rt->flv_data, rpkt.data, rpkt.size); + ret = handle_metadata(rt, &rpkt); ff_rtmp_packet_destroy(&rpkt); return 0; } @@ -2309,7 +2491,7 @@ static int get_packet(URLContext *s, int for_header) static int rtmp_close(URLContext *h) { RTMPContext *rt = h->priv_data; - int ret = 0; + int ret = 0, i, j; if (!rt->is_input) { rt->flv_data = NULL; @@ -2320,6 +2502,11 @@ static int rtmp_close(URLContext *h) } if (rt->state > STATE_HANDSHAKED) ret = gen_delete_stream(h, rt); + for (i = 0; i < 2; i++) { + for (j = 0; j < rt->nb_prev_pkt[i]; j++) + ff_rtmp_packet_destroy(&rt->prev_pkt[i][j]); + av_freep(&rt->prev_pkt[i]); + } free_tracked_methods(rt); av_freep(&rt->flv_data); @@ -2327,6 +2514,70 @@ static int rtmp_close(URLContext *h) return ret; } +/** + * Insert a fake onMetadata packet into the FLV stream to notify the FLV + * demuxer about the duration of the stream. + * + * This should only be done if there was no real onMetadata packet sent by the + * server at the start of the stream and if we were able to retrieve a valid + * duration via a getStreamLength call. + * + * @return 0 for successful operation, negative value in case of error + */ +static int inject_fake_duration_metadata(RTMPContext *rt) +{ + // We need to insert the metdata packet directly after the FLV + // header, i.e. we need to move all other already read data by the + // size of our fake metadata packet. + + uint8_t* p; + // Keep old flv_data pointer + uint8_t* old_flv_data = rt->flv_data; + // Allocate a new flv_data pointer with enough space for the additional package + if (!(rt->flv_data = av_malloc(rt->flv_size + 55))) { + rt->flv_data = old_flv_data; + return AVERROR(ENOMEM); + } + + // Copy FLV header + memcpy(rt->flv_data, old_flv_data, 13); + // Copy remaining packets + memcpy(rt->flv_data + 13 + 55, old_flv_data + 13, rt->flv_size - 13); + // Increase the size by the injected packet + rt->flv_size += 55; + // Delete the old FLV data + av_free(old_flv_data); + + p = rt->flv_data + 13; + bytestream_put_byte(&p, FLV_TAG_TYPE_META); + bytestream_put_be24(&p, 40); // size of data part (sum of all parts below) + bytestream_put_be24(&p, 0); // timestamp + bytestream_put_be32(&p, 0); // reserved + + // first event name as a string + bytestream_put_byte(&p, AMF_DATA_TYPE_STRING); + // "onMetaData" as AMF string + bytestream_put_be16(&p, 10); + bytestream_put_buffer(&p, "onMetaData", 10); + + // mixed array (hash) with size and string/type/data tuples + bytestream_put_byte(&p, AMF_DATA_TYPE_MIXEDARRAY); + bytestream_put_be32(&p, 1); // metadata_count + + // "duration" as AMF string + bytestream_put_be16(&p, 8); + bytestream_put_buffer(&p, "duration", 8); + bytestream_put_byte(&p, AMF_DATA_TYPE_NUMBER); + bytestream_put_be64(&p, av_double2int(rt->duration)); + + // Finalise object + bytestream_put_be16(&p, 0); // Empty string + bytestream_put_byte(&p, AMF_END_OF_OBJECT); + bytestream_put_be32(&p, 40); // size of data part (sum of all parts below) + + return 0; +} + /** * Open RTMP connection and verify that the stream can be played. * @@ -2340,7 +2591,7 @@ static int rtmp_open(URLContext *s, const char *uri, int flags) { RTMPContext *rt = s->priv_data; char proto[8], hostname[256], path[1024], auth[100], *fname; - char *old_app; + char *old_app, *qmark, fname_buffer[1024]; uint8_t buf[2048]; int port; AVDictionary *opts = NULL; @@ -2438,7 +2689,20 @@ reconnect: } //extract "app" part from path - if (!strncmp(path, "/ondemand/", 10)) { + qmark = strchr(path, '?'); + if (qmark && strstr(qmark, "slist=")) { + char* amp; + // After slist we have the playpath, before the params, the app + av_strlcpy(rt->app, path + 1, FFMIN(qmark - path, APP_MAX_LENGTH)); + fname = strstr(path, "slist=") + 6; + // Strip any further query parameters from fname + amp = strchr(fname, '&'); + if (amp) { + av_strlcpy(fname_buffer, fname, FFMIN(amp - fname + 1, + sizeof(fname_buffer))); + fname = fname_buffer; + } + } else if (!strncmp(path, "/ondemand/", 10)) { fname = path + 10; memcpy(rt->app, "ondemand", 9); } else { @@ -2453,10 +2717,10 @@ reconnect: fname = strchr(p + 1, '/'); if (!fname || (c && c < fname)) { fname = p + 1; - av_strlcpy(rt->app, path + 1, p - path); + av_strlcpy(rt->app, path + 1, FFMIN(p - path, APP_MAX_LENGTH)); } else { fname++; - av_strlcpy(rt->app, path + 1, fname - path - 1); + av_strlcpy(rt->app, path + 1, FFMIN(fname - path - 1, APP_MAX_LENGTH)); } } } @@ -2480,9 +2744,9 @@ reconnect: (!strcmp(fname + len - 4, ".f4v") || !strcmp(fname + len - 4, ".mp4"))) { memcpy(rt->playpath, "mp4:", 5); - } else if (len >= 4 && !strcmp(fname + len - 4, ".flv")) { - fname[len - 4] = '\0'; } else { + if (len >= 4 && !strcmp(fname + len - 4, ".flv")) + fname[len - 4] = '\0'; rt->playpath[0] = 0; } av_strlcat(rt->playpath, fname, PLAYPATH_MAX_LENGTH); @@ -2516,8 +2780,12 @@ reconnect: rt->client_report_size = 1048576; rt->bytes_read = 0; + rt->has_audio = 0; + rt->has_video = 0; + rt->received_metadata = 0; rt->last_bytes_read = 0; rt->server_bw = 2500000; + rt->duration = 0; av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n", proto, path, rt->app, rt->playpath); @@ -2525,7 +2793,7 @@ reconnect: if ((ret = gen_connect(s, rt)) < 0) goto fail; } else { - if (read_connect(s, s->priv_data) < 0) + if ((ret = read_connect(s, s->priv_data)) < 0) goto fail; } @@ -2536,11 +2804,14 @@ reconnect: goto fail; if (rt->do_reconnect) { + int i; ffurl_close(rt->stream); rt->stream = NULL; rt->do_reconnect = 0; rt->nb_invokes = 0; - memset(rt->prev_pkt, 0, sizeof(rt->prev_pkt)); + for (i = 0; i < 2; i++) + memset(rt->prev_pkt[i], 0, + sizeof(**rt->prev_pkt) * rt->nb_prev_pkt[i]); free_tracked_methods(rt); goto reconnect; } @@ -2548,9 +2819,38 @@ reconnect: if (rt->is_input) { // generate FLV header for demuxer rt->flv_size = 13; - rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); + if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) + goto fail; rt->flv_off = 0; - memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size); + memcpy(rt->flv_data, "FLV\1\0\0\0\0\011\0\0\0\0", rt->flv_size); + + // Read packets until we reach the first A/V packet or read metadata. + // If there was a metadata package in front of the A/V packets, we can + // build the FLV header from this. If we do not receive any metadata, + // the FLV decoder will allocate the needed streams when their first + // audio or video packet arrives. + while (!rt->has_audio && !rt->has_video && !rt->received_metadata) { + if ((ret = get_packet(s, 0)) < 0) + goto fail; + } + + // Either after we have read the metadata or (if there is none) the + // first packet of an A/V stream, we have a better knowledge about the + // streams, so set the FLV header accordingly. + if (rt->has_audio) { + rt->flv_data[4] |= FLV_HEADER_FLAG_HASAUDIO; + } + if (rt->has_video) { + rt->flv_data[4] |= FLV_HEADER_FLAG_HASVIDEO; + } + + // If we received the first packet of an A/V stream and no metadata but + // the server returned a valid duration, create a fake metadata packet + // to inform the FLV decoder about the duration. + if (!rt->received_metadata && rt->duration > 0) { + if ((ret = inject_fake_duration_metadata(rt)) < 0) + goto fail; + } } else { rt->flv_size = 0; rt->flv_data = NULL; @@ -2615,11 +2915,25 @@ static int64_t rtmp_seek(URLContext *s, int stream_index, int64_t timestamp, return timestamp; } +static int rtmp_pause(URLContext *s, int pause) +{ + RTMPContext *rt = s->priv_data; + int ret; + av_log(s, AV_LOG_DEBUG, "Pause at timestamp %d\n", + rt->last_timestamp); + if ((ret = gen_pause(s, rt, pause, rt->last_timestamp)) < 0) { + av_log(s, AV_LOG_ERROR, "Unable to send pause command at timestamp %d\n", + rt->last_timestamp); + return ret; + } + return 0; +} + static int rtmp_write(URLContext *s, const uint8_t *buf, int size) { RTMPContext *rt = s->priv_data; int size_temp = size; - int pktsize, pkttype; + int pktsize, pkttype, copy; uint32_t ts; const uint8_t *buf_temp = buf; uint8_t c; @@ -2634,14 +2948,14 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) continue; } - if (rt->flv_header_bytes < 11) { + if (rt->flv_header_bytes < RTMP_HEADER) { const uint8_t *header = rt->flv_header; - int copy = FFMIN(11 - rt->flv_header_bytes, size_temp); int channel = RTMP_AUDIO_CHANNEL; + copy = FFMIN(RTMP_HEADER - rt->flv_header_bytes, size_temp); bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy); rt->flv_header_bytes += copy; size_temp -= copy; - if (rt->flv_header_bytes < 11) + if (rt->flv_header_bytes < RTMP_HEADER) break; pkttype = bytestream_get_byte(&header); @@ -2654,11 +2968,15 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) if (pkttype == RTMP_PT_VIDEO) channel = RTMP_VIDEO_CHANNEL; - //force 12bytes header if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) || pkttype == RTMP_PT_NOTIFY) { - if (pkttype == RTMP_PT_NOTIFY) - pktsize += 16; + if ((ret = ff_rtmp_check_alloc_array(&rt->prev_pkt[1], + &rt->nb_prev_pkt[1], + channel)) < 0) + return ret; + // Force sending a full 12 bytes header by clearing the + // channel id, to make it not match a potential earlier + // packet in the same channel. rt->prev_pkt[1][channel].channel_id = 0; } @@ -2669,24 +2987,43 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) rt->out_pkt.extra = rt->stream_id; rt->flv_data = rt->out_pkt.data; - - if (pkttype == RTMP_PT_NOTIFY) - ff_amf_write_string(&rt->flv_data, "@setDataFrame"); } - if (rt->flv_size - rt->flv_off > size_temp) { - bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp); - rt->flv_off += size_temp; - size_temp = 0; - } else { - bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off); - size_temp -= rt->flv_size - rt->flv_off; - rt->flv_off += rt->flv_size - rt->flv_off; - } + copy = FFMIN(rt->flv_size - rt->flv_off, size_temp); + bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, copy); + rt->flv_off += copy; + size_temp -= copy; if (rt->flv_off == rt->flv_size) { rt->skip_bytes = 4; + if (rt->out_pkt.type == RTMP_PT_NOTIFY) { + // For onMetaData and |RtmpSampleAccess packets, we want + // @setDataFrame prepended to the packet before it gets sent. + // However, not all RTMP_PT_NOTIFY packets (e.g., onTextData + // and onCuePoint). + uint8_t commandbuffer[64]; + int stringlen = 0; + GetByteContext gbc; + + bytestream2_init(&gbc, rt->flv_data, rt->flv_size); + if (!ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer), + &stringlen)) { + if (!strcmp(commandbuffer, "onMetaData") || + !strcmp(commandbuffer, "|RtmpSampleAccess")) { + uint8_t *ptr; + if ((ret = av_reallocp(&rt->out_pkt.data, rt->out_pkt.size + 16)) < 0) { + rt->flv_size = rt->flv_off = rt->flv_header_bytes = 0; + return ret; + } + memmove(rt->out_pkt.data + 16, rt->out_pkt.data, rt->out_pkt.size); + rt->out_pkt.size += 16; + ptr = rt->out_pkt.data; + ff_amf_write_string(&ptr, "@setDataFrame"); + } + } + } + if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0) return ret; rt->flv_size = 0; @@ -2719,7 +3056,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt, rt->in_chunk_size, - rt->prev_pkt[0], c)) <= 0) + &rt->prev_pkt[0], + &rt->nb_prev_pkt[0], c)) <= 0) return ret; if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0) @@ -2754,6 +3092,7 @@ static const AVOption rtmp_options[] = { {"rtmp_swfverify", "URL to player swf file, compute hash/size automatically.", OFFSET(swfverify), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC}, {"rtmp_tcurl", "URL of the target stream. Defaults to proto://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC}, {"rtmp_listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" }, + {"listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" }, {"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1", OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" }, { NULL }, }; @@ -2771,6 +3110,7 @@ URLProtocol ff_##flavor##_protocol = { \ .url_open = rtmp_open, \ .url_read = rtmp_read, \ .url_read_seek = rtmp_seek, \ + .url_read_pause = rtmp_pause, \ .url_write = rtmp_write, \ .url_close = rtmp_close, \ .priv_data_size = sizeof(RTMPContext), \