]> git.sesse.net Git - ffmpeg/blobdiff - libavformat/libamqp.c
libswscale: Remove unused deprecated functions, make used ones static
[ffmpeg] / libavformat / libamqp.c
index aaf0e511522c8542fd50ce0eaedd383ab72652ab..c3b9c484ea1a25275292c750a662394400c2c476 100644 (file)
@@ -39,6 +39,7 @@ typedef struct AMQPContext {
     int pkt_size;
     int64_t connection_timeout;
     int pkt_size_overflow;
+    int delivery_mode;
 } AMQPContext;
 
 #define STR_LEN           1024
@@ -52,16 +53,19 @@ static const AVOption options[] = {
     { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
     { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
     { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
+    { "delivery_mode",  "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, "delivery_mode"},
+    { "persistent",     "Persistent delivery mode",     0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, "delivery_mode" },
+    { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, "delivery_mode" },
     { NULL }
 };
 
 static int amqp_proto_open(URLContext *h, const char *uri, int flags)
 {
     int ret, server_msg;
-    char hostname[STR_LEN], credentials[STR_LEN];
+    char hostname[STR_LEN], credentials[STR_LEN], path[STR_LEN];
     int port;
-    const char *user, *password = NULL;
-    const char *user_decoded, *password_decoded;
+    const char *user, *password = NULL, *vhost;
+    const char *user_decoded, *password_decoded, *vhost_decoded;
     char *p;
     amqp_rpc_reply_t broker_reply;
     struct timeval tval = { 0 };
@@ -72,7 +76,7 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
     h->max_packet_size = s->pkt_size;
 
     av_url_split(NULL, 0, credentials, sizeof(credentials),
-                 hostname, sizeof(hostname), &port, NULL, 0, uri);
+                 hostname, sizeof(hostname), &port, path, sizeof(path), uri);
 
     if (port < 0)
         port = 5672;
@@ -105,8 +109,27 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
         return AVERROR(ENOMEM);
     }
 
+    /* skip query for now */
+    p = strchr(path, '?');
+    if (p)
+        *p = '\0';
+
+    vhost = path;
+    if (*vhost == '\0')
+        vhost = "/";
+    else
+        vhost++; /* skip leading '/' */
+
+    vhost_decoded = ff_urldecode(vhost, 0);
+    if (!vhost_decoded) {
+        av_freep(&user_decoded);
+        av_freep(&password_decoded);
+        return AVERROR(ENOMEM);
+    }
+
     s->conn = amqp_new_connection();
     if (!s->conn) {
+        av_freep(&vhost_decoded);
         av_freep(&user_decoded);
         av_freep(&password_decoded);
         av_log(h, AV_LOG_ERROR, "Error creating connection\n");
@@ -132,7 +155,7 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
         goto destroy_connection;
     }
 
-    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
+    broker_reply = amqp_login(s->conn, vhost_decoded, 0, s->pkt_size, 0,
                               AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
 
     if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
@@ -191,6 +214,7 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
         }
     }
 
+    av_freep(&vhost_decoded);
     av_freep(&user_decoded);
     av_freep(&password_decoded);
     return 0;
@@ -202,6 +226,7 @@ close_connection:
 destroy_connection:
     amqp_destroy_connection(s->conn);
 
+    av_freep(&vhost_decoded);
     av_freep(&user_decoded);
     av_freep(&password_decoded);
     return AVERROR_EXTERNAL;
@@ -222,7 +247,7 @@ static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
 
     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
     props.content_type = amqp_cstring_bytes("octet/stream");
-    props.delivery_mode = 2; /* persistent delivery mode */
+    props.delivery_mode = s->delivery_mode;
 
     ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
                              amqp_cstring_bytes(s->routing_key), 0, 0,