X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=libavformat%2Frtmpproto.c;h=aca1fd32721e6c47722088ef795318938091bbff;hb=49bd8e4b843d9a92fdb8ef4361a551a1e019c65d;hp=96367e5eb5227345d4b607819d3d9c36f8723484;hpb=e07c92e4bb25b387381b0cd9268a3105f3b6cddb;p=ffmpeg diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index 96367e5eb52..aca1fd32721 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -20,7 +20,7 @@ */ /** - * @file libavformat/rtmpproto.c + * @file * RTMP protocol */ @@ -29,6 +29,7 @@ #include "libavutil/lfg.h" #include "libavutil/sha.h" #include "avformat.h" +#include "internal.h" #include "network.h" @@ -43,6 +44,8 @@ #define LOG_CONTEXT s #endif +//#define DEBUG + /** RTMP protocol handler state */ typedef enum { STATE_START, ///< client has not done anything yet @@ -53,6 +56,7 @@ typedef enum { STATE_READY, ///< client has sent all needed commands and waits for server reply STATE_PLAYING, ///< client has started receiving multimedia data from server STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output) + STATE_STOPPED, ///< the broadcast has been stopped } ClientState; /** protocol handler context */ @@ -69,6 +73,9 @@ typedef struct RTMPContext { int flv_size; ///< current buffer size int flv_off; ///< number of bytes read from current buffer RTMPPacket out_pkt; ///< rtmp packet, created from flv a/v or metadata (for output) + uint32_t client_report_size; ///< number of bytes after which client should report to server + uint32_t bytes_read; ///< number of bytes read from server + uint32_t last_bytes_read; ///< number of bytes read last reported to server } RTMPContext; #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing @@ -95,7 +102,7 @@ static const uint8_t rtmp_server_key[] = { }; /** - * Generates 'connect' call and sends it to the server. + * Generate 'connect' call and send it to the server. */ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, const char *host, int port) @@ -107,7 +114,7 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096); p = pkt.data; - snprintf(tcurl, sizeof(tcurl), "%s://%s:%d/%s", proto, host, port, rt->app); + ff_url_join(tcurl, sizeof(tcurl), proto, NULL, host, port, "/%s", rt->app); ff_amf_write_string(&p, "connect"); ff_amf_write_number(&p, 1.0); ff_amf_write_object_start(&p); @@ -143,10 +150,11 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, pkt.data_size = p - pkt.data; ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); } /** - * Generates 'releaseStream' call and sends it to the server. It should make + * Generate 'releaseStream' call and send it to the server. It should make * the server release some channel for media streams. */ static void gen_release_stream(URLContext *s, RTMPContext *rt) @@ -169,7 +177,7 @@ static void gen_release_stream(URLContext *s, RTMPContext *rt) } /** - * Generates 'FCPublish' call and sends it to the server. It should make + * Generate 'FCPublish' call and send it to the server. It should make * the server preapare for receiving media streams. */ static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt) @@ -192,7 +200,7 @@ static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt) } /** - * Generates 'FCUnpublish' call and sends it to the server. It should make + * Generate 'FCUnpublish' call and send it to the server. It should make * the server destroy stream. */ static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt) @@ -215,7 +223,7 @@ static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt) } /** - * Generates 'createStream' call and sends it to the server. It should make + * Generate 'createStream' call and send it to the server. It should make * the server allocate some channel for media streams. */ static void gen_create_stream(URLContext *s, RTMPContext *rt) @@ -237,7 +245,7 @@ static void gen_create_stream(URLContext *s, RTMPContext *rt) /** - * Generates 'deleteStream' call and sends it to the server. It should make + * Generate 'deleteStream' call and send it to the server. It should make * the server remove some channel for media streams. */ static void gen_delete_stream(URLContext *s, RTMPContext *rt) @@ -259,7 +267,7 @@ static void gen_delete_stream(URLContext *s, RTMPContext *rt) } /** - * Generates 'play' call and sends it to the server, then pings the server + * Generate 'play' call and send it to the server, then ping the server * to start actual playing. */ static void gen_play(URLContext *s, RTMPContext *rt) @@ -294,7 +302,7 @@ static void gen_play(URLContext *s, RTMPContext *rt) } /** - * Generates 'publish' call and sends it to the server. + * Generate 'publish' call and send it to the server. */ static void gen_publish(URLContext *s, RTMPContext *rt) { @@ -318,7 +326,7 @@ static void gen_publish(URLContext *s, RTMPContext *rt) } /** - * Generates ping reply and sends it to the server. + * Generate ping reply and send it to the server. */ static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt) { @@ -328,7 +336,22 @@ static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt) ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING, ppkt->timestamp + 1, 6); p = pkt.data; bytestream_put_be16(&p, 7); - bytestream_put_be32(&p, AV_RB32(ppkt->data+2) + 1); + bytestream_put_be32(&p, AV_RB32(ppkt->data+2)); + ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); +} + +/** + * Generate report on bytes read so far and send it to the server. + */ +static void gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts) +{ + RTMPPacket pkt; + uint8_t *p; + + ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_BYTES_READ, ts, 4); + p = pkt.data; + bytestream_put_be32(&p, rt->bytes_read); ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); } @@ -338,7 +361,7 @@ static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt) #define HMAC_OPAD_VAL 0x5C /** - * Calculates HMAC-SHA2 digest for RTMP handshake packets. + * Calculate HMAC-SHA2 digest for RTMP handshake packets. * * @param src input buffer * @param len input buffer length (should be 1536) @@ -387,7 +410,7 @@ static void rtmp_calc_digest(const uint8_t *src, int len, int gap, } /** - * Puts HMAC-SHA2 digest of packet data (except for the bytes where this digest + * Put HMAC-SHA2 digest of packet data (except for the bytes where this digest * will be stored) into that packet. * * @param buf handshake data (1536 bytes) @@ -408,7 +431,7 @@ static int rtmp_handshake_imprint_with_digest(uint8_t *buf) } /** - * Verifies that the received server response has the expected digest value. + * Verify that the received server response has the expected digest value. * * @param buf handshake data received from the server (1536 bytes) * @param off position to search digest offset from @@ -432,7 +455,7 @@ static int rtmp_validate_digest(uint8_t *buf, int off) } /** - * Performs handshake with the server by means of exchanging pseudorandom data + * Perform handshake with the server by means of exchanging pseudorandom data * signed with HMAC-SHA2 digest. * * @return 0 if handshake succeeds, negative value otherwise @@ -477,7 +500,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n", serverdata[5], serverdata[6], serverdata[7], serverdata[8]); - if (rt->is_input) { + if (rt->is_input && serverdata[5] >= 3) { server_pos = rtmp_validate_digest(serverdata + 1, 772); if (!server_pos) { server_pos = rtmp_validate_digest(serverdata + 1, 8); @@ -517,7 +540,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) } /** - * Parses received packet and may perform some action depending on + * Parse received packet and possibly perform some action depending on * the packet contents. * @return 0 for no errors, negative values for serious errors which prevent * further communications, positive values for uncritical errors @@ -527,6 +550,10 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) int i, t; const uint8_t *data_end = pkt->data + pkt->data_size; +#ifdef DEBUG + ff_rtmp_packet_dump(LOG_CONTEXT, pkt); +#endif + switch (pkt->type) { case RTMP_PT_CHUNK_SIZE: if (pkt->data_size != 4) { @@ -548,6 +575,16 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) if (t == 6) gen_pong(s, rt, pkt); break; + case RTMP_PT_CLIENT_BW: + if (pkt->data_size < 4) { + av_log(LOG_CONTEXT, AV_LOG_ERROR, + "Client bandwidth report packet is less than 4 bytes long (%d)\n", + pkt->data_size); + return -1; + } + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Client bandwidth = %d\n", AV_RB32(pkt->data)); + rt->client_report_size = AV_RB32(pkt->data) >> 1; + break; case RTMP_PT_INVOKE: //TODO: check for the messages sent for wrong state? if (!memcmp(pkt->data, "\002\000\006_error", 9)) { @@ -619,6 +656,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) t = ff_amf_get_field_value(ptr, data_end, "code", tmpstr, sizeof(tmpstr)); if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING; + if (!t && !strcmp(tmpstr, "NetStream.Play.Stop")) rt->state = STATE_STOPPED; + if (!t && !strcmp(tmpstr, "NetStream.Play.UnpublishNotify")) rt->state = STATE_STOPPED; if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING; } break; @@ -627,7 +666,7 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) } /** - * Interacts with the server by receiving and sending RTMP packets until + * Interact with the server by receiving and sending RTMP packets until * there is some significant data (media data or expected status notification). * * @param s reading context @@ -641,23 +680,40 @@ static int get_packet(URLContext *s, int for_header) { RTMPContext *rt = s->priv_data; int ret; + uint8_t *p; + const uint8_t *next; + uint32_t data_size; + uint32_t ts, cts, pts=0; + + if (rt->state == STATE_STOPPED) + return AVERROR_EOF; for (;;) { RTMPPacket rpkt; if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt, - rt->chunk_size, rt->prev_pkt[0])) != 0) { - if (ret > 0) { + rt->chunk_size, rt->prev_pkt[0])) <= 0) { + if (ret == 0) { return AVERROR(EAGAIN); } else { return AVERROR(EIO); } } + rt->bytes_read += ret; + if (rt->bytes_read > rt->last_bytes_read + rt->client_report_size) { + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Sending bytes read report\n"); + gen_bytes_read(s, rt, rpkt.timestamp + 1); + rt->last_bytes_read = rt->bytes_read; + } ret = rtmp_parse_result(s, rt, &rpkt); if (ret < 0) {//serious error in current packet ff_rtmp_packet_destroy(&rpkt); return -1; } + if (rt->state == STATE_STOPPED) { + ff_rtmp_packet_destroy(&rpkt); + return AVERROR_EOF; + } if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) { ff_rtmp_packet_destroy(&rpkt); return 0; @@ -668,8 +724,7 @@ static int get_packet(URLContext *s, int for_header) } if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO || (rpkt.type == RTMP_PT_NOTIFY && !memcmp("\002\000\012onMetaData", rpkt.data, 13))) { - uint8_t *p; - uint32_t ts = rpkt.timestamp; + ts = rpkt.timestamp; // generate packet header and put data into buffer for FLV demuxer rt->flv_off = 0; @@ -689,6 +744,23 @@ static int get_packet(URLContext *s, int for_header) rt->flv_off = 0; rt->flv_size = rpkt.data_size; rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); + /* rewrite timestamps */ + next = rpkt.data; + ts = rpkt.timestamp; + while (next - rpkt.data < rpkt.data_size - 11) { + next++; + data_size = bytestream_get_be24(&next); + p=next; + cts = bytestream_get_be24(&next); + cts |= bytestream_get_byte(&next) << 24; + if (pts==0) + pts=cts; + ts += cts - pts; + pts = cts; + bytestream_put_be24(&p, ts); + bytestream_put_byte(&p, ts >> 24); + next += data_size + 3 + 4; + } memcpy(rt->flv_data, rpkt.data, rpkt.data_size); ff_rtmp_packet_destroy(&rpkt); return 0; @@ -719,7 +791,7 @@ static int rtmp_close(URLContext *h) } /** - * Opens RTMP connection and verifies that the stream can be played. + * Open RTMP connection and verify that the stream can be played. * * URL syntax: rtmp://server[:port][/app][/playpath] * where 'app' is first one or two directories in the path @@ -741,12 +813,12 @@ static int rtmp_open(URLContext *s, const char *uri, int flags) s->priv_data = rt; rt->is_input = !(flags & URL_WRONLY); - url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port, - path, sizeof(path), s->filename); + av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port, + path, sizeof(path), s->filename); if (port < 0) port = RTMP_DEFAULT_PORT; - snprintf(buf, sizeof(buf), "tcp://%s:%d", hostname, port); + ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL); if (url_open(&rt->stream, buf, URL_RDWR) < 0) { av_log(LOG_CONTEXT, AV_LOG_ERROR, "Cannot open connection %s\n", buf); @@ -789,6 +861,10 @@ static int rtmp_open(URLContext *s, const char *uri, int flags) } strncat(rt->playpath, fname, sizeof(rt->playpath) - 5); + rt->client_report_size = 1048576; + rt->bytes_read = 0; + rt->last_bytes_read = 0; + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n", proto, path, rt->app, rt->playpath); gen_connect(s, rt, proto, hostname, port); @@ -839,6 +915,7 @@ static int rtmp_read(URLContext *s, uint8_t *buf, int size) buf += data_left; size -= data_left; rt->flv_off = rt->flv_size; + return data_left; } if ((ret = get_packet(s, 0)) < 0) return ret; @@ -846,7 +923,7 @@ static int rtmp_read(URLContext *s, uint8_t *buf, int size) return orig_size; } -static int rtmp_write(URLContext *h, uint8_t *buf, int size) +static int rtmp_write(URLContext *h, const uint8_t *buf, int size) { RTMPContext *rt = h->priv_data; int size_temp = size;