typedef enum {
STATE_START, ///< client has not done anything yet
STATE_HANDSHAKED, ///< client has performed handshake
- STATE_RELEASING, ///< client releasing stream before publish it (for output)
STATE_FCPUBLISH, ///< client FCPublishing stream (for output)
- STATE_CONNECTING, ///< client connected to server successfully
- STATE_READY, ///< client has sent all needed commands and waits for server reply
STATE_PLAYING, ///< client has started receiving multimedia data from server
STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output)
STATE_STOPPED, ///< the broadcast has been stopped
} ClientState;
+typedef struct TrackedMethod {
+ char *name;
+ int id;
+} TrackedMethod;
+
/** protocol handler context */
typedef struct RTMPContext {
const AVClass *class;
uint8_t flv_header[11]; ///< partial incoming flv packet header
int flv_header_bytes; ///< number of initialized bytes in flv_header
int nb_invokes; ///< keeps track of invoke messages
- int create_stream_invoke; ///< invoke id for the create stream command
char* tcurl; ///< url of the target stream
char* flashver; ///< version of the flash plugin
char* swfurl; ///< url of the swf player
char* pageurl; ///< url of the web page
+ char* subscribe; ///< name of live stream to subscribe
int server_bw; ///< server bandwidth
int client_buffer_time; ///< client buffer time in ms
int flush_interval; ///< number of packets flushed in the same request (RTMPT only)
int encrypted; ///< use an encrypted connection (RTMPE only)
+ TrackedMethod*tracked_methods; ///< tracked methods buffer
+ int nb_tracked_methods; ///< number of tracked methods
+ int tracked_methods_size; ///< size of the tracked methods buffer
} RTMPContext;
#define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing
0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
};
+static int add_tracked_method(RTMPContext *rt, const char *name, int id)
+{
+ void *ptr;
+
+ if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) {
+ rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2;
+ ptr = av_realloc(rt->tracked_methods,
+ rt->tracked_methods_size * sizeof(*rt->tracked_methods));
+ if (!ptr)
+ return AVERROR(ENOMEM);
+ rt->tracked_methods = ptr;
+ }
+
+ rt->tracked_methods[rt->nb_tracked_methods].name = av_strdup(name);
+ if (!rt->tracked_methods[rt->nb_tracked_methods].name)
+ return AVERROR(ENOMEM);
+ rt->tracked_methods[rt->nb_tracked_methods].id = id;
+ rt->nb_tracked_methods++;
+
+ return 0;
+}
+
+static void del_tracked_method(RTMPContext *rt, int index)
+{
+ memmove(&rt->tracked_methods[index], &rt->tracked_methods[index + 1],
+ sizeof(*rt->tracked_methods) * (rt->nb_tracked_methods - index - 1));
+ rt->nb_tracked_methods--;
+}
+
+static void free_tracked_methods(RTMPContext *rt)
+{
+ int i;
+
+ for (i = 0; i < rt->nb_tracked_methods; i ++)
+ av_free(rt->tracked_methods[i].name);
+ av_free(rt->tracked_methods);
+}
+
+static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
+{
+ int ret;
+
+ if (pkt->type == RTMP_PT_INVOKE && track) {
+ GetByteContext gbc;
+ char name[128];
+ double pkt_id;
+ int len;
+
+ bytestream2_init(&gbc, pkt->data, pkt->data_size);
+ if ((ret = ff_amf_read_string(&gbc, name, sizeof(name), &len)) < 0)
+ goto fail;
+
+ if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0)
+ goto fail;
+
+ if ((ret = add_tracked_method(rt, name, pkt_id)) < 0)
+ goto fail;
+ }
+
+ ret = ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+fail:
+ ff_rtmp_packet_destroy(pkt);
+ return ret;
+}
+
static int rtmp_write_amf_data(URLContext *s, char *param, uint8_t **p)
{
char *field, *value;
pkt.data_size = p - pkt.data;
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 1);
}
/**
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
ff_amf_write_string(&p, "createStream");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
- rt->create_stream_invoke = rt->nb_invokes;
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 1);
}
ff_amf_write_null(&p);
ff_amf_write_number(&p, rt->main_channel_id);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
bytestream_put_be32(&p, rt->main_channel_id);
bytestream_put_be32(&p, rt->client_buffer_time);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
ff_amf_write_string(&p, rt->playpath);
ff_amf_write_number(&p, rt->live);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 1);
}
/**
ff_amf_write_string(&p, rt->playpath);
ff_amf_write_string(&p, "live");
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 1);
}
/**
uint8_t *p;
int ret;
+ if (ppkt->data_size < 6) {
+ av_log(s, AV_LOG_ERROR, "Too short ping packet (%d)\n",
+ ppkt->data_size);
+ return AVERROR_INVALIDDATA;
+ }
+
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING,
ppkt->timestamp + 1, 6)) < 0)
return ret;
p = pkt.data;
bytestream_put_be16(&p, 7);
bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
p = pkt.data;
bytestream_put_be32(&p, rt->server_bw);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
p = pkt.data;
ff_amf_write_string(&p, "_checkbw");
- ff_amf_write_number(&p, ++rt->nb_invokes);
+ ff_amf_write_number(&p, RTMP_NOTIFICATION);
ff_amf_write_null(&p);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
-
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
}
/**
p = pkt.data;
bytestream_put_be32(&p, rt->bytes_read);
- ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
- rt->prev_pkt[1]);
- ff_rtmp_packet_destroy(&pkt);
- return ret;
+ return rtmp_send_packet(rt, &pkt, 0);
+}
+
+static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
+ const char *subscribe)
+{
+ RTMPPacket pkt;
+ uint8_t *p;
+ int ret;
+
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
+ 0, 27 + strlen(subscribe))) < 0)
+ return ret;
+
+ p = pkt.data;
+ ff_amf_write_string(&p, "FCSubscribe");
+ ff_amf_write_number(&p, ++rt->nb_invokes);
+ ff_amf_write_null(&p);
+ ff_amf_write_string(&p, subscribe);
+
+ return rtmp_send_packet(rt, &pkt, 1);
}
int ff_rtmp_calc_digest(const uint8_t *src, int len, int gap,
RTMPContext *rt = s->priv_data;
int ret;
- if (pkt->data_size != 4) {
+ if (pkt->data_size < 4) {
av_log(s, AV_LOG_ERROR,
- "Chunk size change packet is not 4 bytes long (%d)\n",
+ "Too short chunk size change packet (%d)\n",
pkt->data_size);
return AVERROR_INVALIDDATA;
}
RTMPContext *rt = s->priv_data;
int t, ret;
+ if (pkt->data_size < 2) {
+ av_log(s, AV_LOG_ERROR, "Too short ping packet (%d)\n",
+ pkt->data_size);
+ return AVERROR_INVALIDDATA;
+ }
+
t = AV_RB16(pkt->data);
if (t == 6) {
if ((ret = gen_pong(s, rt, pkt)) < 0)
pkt->data_size);
return AVERROR_INVALIDDATA;
}
- av_log(s, AV_LOG_DEBUG, "Client bandwidth = %d\n", AV_RB32(pkt->data));
- rt->client_report_size = AV_RB32(pkt->data) >> 1;
+
+ rt->client_report_size = AV_RB32(pkt->data);
+ if (rt->client_report_size <= 0) {
+ av_log(s, AV_LOG_ERROR, "Incorrect client bandwidth %d\n",
+ rt->client_report_size);
+ return AVERROR_INVALIDDATA;
+
+ }
+ av_log(s, AV_LOG_DEBUG, "Client bandwidth = %d\n", rt->client_report_size);
+ rt->client_report_size >>= 1;
return 0;
}
{
RTMPContext *rt = s->priv_data;
+ if (pkt->data_size < 4) {
+ av_log(s, AV_LOG_ERROR,
+ "Too short server bandwidth report packet (%d)\n",
+ pkt->data_size);
+ return AVERROR_INVALIDDATA;
+ }
+
rt->server_bw = AV_RB32(pkt->data);
if (rt->server_bw <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect server bandwidth %d\n",
RTMPContext *rt = s->priv_data;
int i, t;
const uint8_t *data_end = pkt->data + pkt->data_size;
- int ret;
+ char *tracked_method = NULL;
+ int ret = 0;
//TODO: check for the messages sent for wrong state?
if (!memcmp(pkt->data, "\002\000\006_error", 9)) {
av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
return -1;
} else if (!memcmp(pkt->data, "\002\000\007_result", 10)) {
- switch (rt->state) {
- case STATE_HANDSHAKED:
- if (!rt->is_input) {
- if ((ret = gen_release_stream(s, rt)) < 0)
- return ret;
- if ((ret = gen_fcpublish_stream(s, rt)) < 0)
- return ret;
- rt->state = STATE_RELEASING;
- } else {
- if ((ret = gen_server_bw(s, rt)) < 0)
- return ret;
- rt->state = STATE_CONNECTING;
- }
- if ((ret = gen_create_stream(s, rt)) < 0)
- return ret;
- break;
- case STATE_FCPUBLISH:
- rt->state = STATE_CONNECTING;
- break;
- case STATE_RELEASING:
- rt->state = STATE_FCPUBLISH;
- /* hack for Wowza Media Server, it does not send result for
- * releaseStream and FCPublish calls */
- if (!pkt->data[10]) {
- int pkt_id = av_int2double(AV_RB64(pkt->data + 11));
- if (pkt_id == rt->create_stream_invoke)
- rt->state = STATE_CONNECTING;
- }
- if (rt->state != STATE_CONNECTING)
- break;
- case STATE_CONNECTING:
- //extract a number from the result
- if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
- av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
- } else {
- rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
- }
- if (rt->is_input) {
- if ((ret = gen_play(s, rt)) < 0)
- return ret;
- if ((ret = gen_buffer_time(s, rt)) < 0)
- return ret;
- } else {
- if ((ret = gen_publish(s, rt)) < 0)
- return ret;
+ GetByteContext gbc;
+ double pkt_id;
+
+ bytestream2_init(&gbc, pkt->data + 10, pkt->data_size);
+ if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0)
+ return ret;
+
+ for (i = 0; i < rt->nb_tracked_methods; i++) {
+ if (rt->tracked_methods[i].id != pkt_id)
+ continue;
+
+ tracked_method = rt->tracked_methods[i].name;
+ del_tracked_method(rt, i);
+ break;
+ }
+
+ if (!tracked_method) {
+ /* Ignore this reply when the current method is not tracked. */
+ return 0;
+ }
+
+ if (!memcmp(tracked_method, "connect", 7)) {
+ if (!rt->is_input) {
+ if ((ret = gen_release_stream(s, rt)) < 0)
+ goto invoke_fail;
+
+ if ((ret = gen_fcpublish_stream(s, rt)) < 0)
+ goto invoke_fail;
+ } else {
+ if ((ret = gen_server_bw(s, rt)) < 0)
+ goto invoke_fail;
+ }
+
+ if ((ret = gen_create_stream(s, rt)) < 0)
+ goto invoke_fail;
+
+ if (rt->is_input) {
+ /* Send the FCSubscribe command when the name of live
+ * stream is defined by the user or if it's a live stream. */
+ if (rt->subscribe) {
+ if ((ret = gen_fcsubscribe_stream(s, rt,
+ rt->subscribe)) < 0)
+ goto invoke_fail;
+ } else if (rt->live == -1) {
+ if ((ret = gen_fcsubscribe_stream(s, rt,
+ rt->playpath)) < 0)
+ goto invoke_fail;
}
- rt->state = STATE_READY;
- break;
+ }
+ } else if (!memcmp(tracked_method, "createStream", 12)) {
+ //extract a number from the result
+ if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
+ av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
+ } else {
+ rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
+ }
+
+ if (!rt->is_input) {
+ if ((ret = gen_publish(s, rt)) < 0)
+ goto invoke_fail;
+ } else {
+ if ((ret = gen_play(s, rt)) < 0)
+ goto invoke_fail;
+ if ((ret = gen_buffer_time(s, rt)) < 0)
+ goto invoke_fail;
+ }
}
} else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) {
const uint8_t* ptr = pkt->data + 11;
return ret;
}
- return 0;
+invoke_fail:
+ av_free(tracked_method);
+ return ret;
}
/**
break;
case RTMP_PT_VIDEO:
case RTMP_PT_AUDIO:
- /* Audio and Video packets are parsed in get_packet() */
+ case RTMP_PT_METADATA:
+ /* Audio, Video and Metadata packets are parsed in get_packet() */
break;
default:
av_log(s, AV_LOG_VERBOSE, "Unknown packet type received 0x%02X\n", pkt->type);
if (rt->state > STATE_HANDSHAKED)
ret = gen_delete_stream(h, rt);
+ free_tracked_methods(rt);
av_freep(&rt->flv_data);
ffurl_close(rt->stream);
return ret;
if (rt->flv_off == rt->flv_size) {
rt->skip_bytes = 4;
- if ((ret = ff_rtmp_packet_write(rt->stream, &rt->out_pkt,
- rt->chunk_size, rt->prev_pkt[1])) < 0)
+ if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
return ret;
- ff_rtmp_packet_destroy(&rt->out_pkt);
rt->flv_size = 0;
rt->flv_off = 0;
rt->flv_header_bytes = 0;
{"recorded", "recorded stream", 0, AV_OPT_TYPE_CONST, {0}, 0, 0, DEC, "rtmp_live"},
{"rtmp_pageurl", "URL of the web page in which the media was embedded. By default no value will be sent.", OFFSET(pageurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_playpath", "Stream identifier to play or to publish", OFFSET(playpath), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
+ {"rtmp_subscribe", "Name of live stream to subscribe to. Defaults to rtmp_playpath.", OFFSET(subscribe), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_swfurl", "URL of the SWF player. By default no value will be sent", OFFSET(swfurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_tcurl", "URL of the target stream. Defaults to proto://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{ NULL },
};
-static const AVClass rtmp_class = {
- .class_name = "rtmp",
- .item_name = av_default_item_name,
- .option = rtmp_options,
- .version = LIBAVUTIL_VERSION_INT,
-};
-
-URLProtocol ff_rtmp_protocol = {
- .name = "rtmp",
- .url_open = rtmp_open,
- .url_read = rtmp_read,
- .url_write = rtmp_write,
- .url_close = rtmp_close,
- .priv_data_size = sizeof(RTMPContext),
- .flags = URL_PROTOCOL_FLAG_NETWORK,
- .priv_data_class= &rtmp_class,
-};
-
-static const AVClass rtmpe_class = {
- .class_name = "rtmpe",
- .item_name = av_default_item_name,
- .option = rtmp_options,
- .version = LIBAVUTIL_VERSION_INT,
-};
-
-URLProtocol ff_rtmpe_protocol = {
- .name = "rtmpe",
- .url_open = rtmp_open,
- .url_read = rtmp_read,
- .url_write = rtmp_write,
- .url_close = rtmp_close,
- .priv_data_size = sizeof(RTMPContext),
- .flags = URL_PROTOCOL_FLAG_NETWORK,
- .priv_data_class = &rtmpe_class,
+#define RTMP_PROTOCOL(flavor) \
+static const AVClass flavor##_class = { \
+ .class_name = #flavor, \
+ .item_name = av_default_item_name, \
+ .option = rtmp_options, \
+ .version = LIBAVUTIL_VERSION_INT, \
+}; \
+ \
+URLProtocol ff_##flavor##_protocol = { \
+ .name = #flavor, \
+ .url_open = rtmp_open, \
+ .url_read = rtmp_read, \
+ .url_write = rtmp_write, \
+ .url_close = rtmp_close, \
+ .priv_data_size = sizeof(RTMPContext), \
+ .flags = URL_PROTOCOL_FLAG_NETWORK, \
+ .priv_data_class= &flavor##_class, \
};
-static const AVClass rtmps_class = {
- .class_name = "rtmps",
- .item_name = av_default_item_name,
- .option = rtmp_options,
- .version = LIBAVUTIL_VERSION_INT,
-};
-
-URLProtocol ff_rtmps_protocol = {
- .name = "rtmps",
- .url_open = rtmp_open,
- .url_read = rtmp_read,
- .url_write = rtmp_write,
- .url_close = rtmp_close,
- .priv_data_size = sizeof(RTMPContext),
- .flags = URL_PROTOCOL_FLAG_NETWORK,
- .priv_data_class = &rtmps_class,
-};
-
-static const AVClass rtmpt_class = {
- .class_name = "rtmpt",
- .item_name = av_default_item_name,
- .option = rtmp_options,
- .version = LIBAVUTIL_VERSION_INT,
-};
-URLProtocol ff_rtmpt_protocol = {
- .name = "rtmpt",
- .url_open = rtmp_open,
- .url_read = rtmp_read,
- .url_write = rtmp_write,
- .url_close = rtmp_close,
- .priv_data_size = sizeof(RTMPContext),
- .flags = URL_PROTOCOL_FLAG_NETWORK,
- .priv_data_class = &rtmpt_class,
-};
-
-static const AVClass rtmpte_class = {
- .class_name = "rtmpte",
- .item_name = av_default_item_name,
- .option = rtmp_options,
- .version = LIBAVUTIL_VERSION_INT,
-};
-
-URLProtocol ff_rtmpte_protocol = {
- .name = "rtmpte",
- .url_open = rtmp_open,
- .url_read = rtmp_read,
- .url_write = rtmp_write,
- .url_close = rtmp_close,
- .priv_data_size = sizeof(RTMPContext),
- .flags = URL_PROTOCOL_FLAG_NETWORK,
- .priv_data_class = &rtmpte_class,
-};
-
-static const AVClass rtmpts_class = {
- .class_name = "rtmpts",
- .item_name = av_default_item_name,
- .option = rtmp_options,
- .version = LIBAVUTIL_VERSION_INT,
-};
-
-URLProtocol ff_rtmpts_protocol = {
- .name = "rtmpts",
- .url_open = rtmp_open,
- .url_read = rtmp_read,
- .url_write = rtmp_write,
- .url_close = rtmp_close,
- .priv_data_size = sizeof(RTMPContext),
- .flags = URL_PROTOCOL_FLAG_NETWORK,
- .priv_data_class = &rtmpts_class,
-};
+RTMP_PROTOCOL(rtmp)
+RTMP_PROTOCOL(rtmpe)
+RTMP_PROTOCOL(rtmps)
+RTMP_PROTOCOL(rtmpt)
+RTMP_PROTOCOL(rtmpte)
+RTMP_PROTOCOL(rtmpts)