]> git.sesse.net Git - casparcg/commitdiff
- Added support for multiple streaming_consumers on the same channel
authorHelge Norberg <helge.norberg@gmail.com>
Fri, 27 Jun 2014 12:49:37 +0000 (14:49 +0200)
committerHelge Norberg <helge.norberg@gmail.com>
Fri, 27 Jun 2014 12:49:37 +0000 (14:49 +0200)
- Added support for the placeholder <client_ip_address> for ADD and REMOVE commands resolving to the client ip address of the AMCP connection.

core/parameters/parameters.cpp
core/parameters/parameters.h
modules/ffmpeg/consumer/streaming_consumer.cpp
protocol/amcp/AMCPCommandsImpl.cpp

index 5002acc330c6bd00e1d7c34be696bbb002b72801..5b02527122b0f99998657578b319c00c1717901b 100644 (file)
@@ -50,12 +50,29 @@ void parameters::to_upper()
        }
 }
 
-bool parameters::has(std::wstring const& parameter) const
+void parameters::replace_placeholders(
+               const std::wstring& placeholder, const std::wstring& replacement)
+{
+       for (size_t i = 0; i < params_.size(); ++i)
+       {
+               auto& param = params_.at(i);
+               auto& param_original = params_original_.at(i);
+
+               if (boost::contains(param, placeholder))
+               {
+                       boost::replace_all(
+                                       param, placeholder, boost::to_upper_copy(replacement));
+                       boost::ireplace_all(param_original, placeholder, replacement);
+               }
+       }
+}
+
+bool parameters::has(const std::wstring& parameter) const
 {
        return boost::range::find(params_, parameter) != params_.end();
 }
 
-bool parameters::remove_if_exists(std::wstring const& key)
+bool parameters::remove_if_exists(const std::wstring& key)
 {
        auto iter = boost::range::find(params_, key);
 
@@ -70,7 +87,7 @@ bool parameters::remove_if_exists(std::wstring const& key)
        return true;
 }
 
-std::vector<std::wstring> parameters::protocol_split(std::wstring const& s)
+std::vector<std::wstring> parameters::protocol_split(const std::wstring& s)
 {
        std::vector<std::wstring> result;
        size_t pos;
@@ -86,7 +103,8 @@ std::vector<std::wstring> parameters::protocol_split(std::wstring const& s)
        return result;
 }
 
-std::wstring parameters::get(std::wstring const& key, std::wstring const& default_value) const
+std::wstring parameters::get(
+               const std::wstring& key, const std::wstring& default_value) const
 {      
        auto it = std::find(std::begin(params_), std::end(params_), key);
        if (it == params_.end() || ++it == params_.end())       
@@ -111,7 +129,7 @@ const std::wstring& parameters::at_original(size_t i) const
        return params_original_.at(i);
 }
 
-void parameters::set(size_t index, std::wstring const& value)
+void parameters::set(size_t index, const std::wstring& value)
 {
        if (index < params_.size())
        {
index f8c271fe186b6e6d9ef0d92e88cb4fe9209e1723..e7d05462c4b8d6bf836aa5209009cdf010c05308 100644 (file)
@@ -35,21 +35,24 @@ class parameters
 public:
        parameters() {}
 
-       explicit parameters(std::vector<std::wstring> const& params);
+       explicit parameters(const std::vector<std::wstring>& params);
 
-       std::vector<std::wstring> const& get_params() const {
+       const std::vector<std::wstring>& get_params() const {
                return params_;
        }
 
-       static std::vector<std::wstring> protocol_split(std::wstring const& s);
+       static std::vector<std::wstring> protocol_split(const std::wstring& s);
 
        void to_upper();
+
+       void replace_placeholders(
+                       const std::wstring& placeholder, const std::wstring& replacement);
        
-       bool has(std::wstring const& key) const;
-       bool remove_if_exists(std::wstring const& key);
+       bool has(const std::wstring& key) const;
+       bool remove_if_exists(const std::wstring& key);
 
        template<typename C>
-       typename std::enable_if<!std::is_convertible<C, std::wstring>::value, typename std::decay<C>::type>::type get(std::wstring const& key, C default_value = C()) const
+       typename std::enable_if<!std::is_convertible<C, std::wstring>::value, typename std::decay<C>::type>::type get(const std::wstring& key, C default_value = C()) const
        {       
                auto it = std::find(std::begin(params_), std::end(params_), key);
                if(it == params_.end() || ++it == params_.end())        
@@ -65,13 +68,13 @@ public:
                return value;
        }
 
-       std::wstring get(std::wstring const& key, std::wstring const& default_value = L"") const;
+       std::wstring get(const std::wstring& key, const std::wstring& default_value = L"") const;
 
        std::wstring get_original_string(int num_skip = 0) const;
 
        const std::wstring& at_original(size_t i) const;
 
-       void set(size_t index, std::wstring const& value);
+       void set(size_t index, const std::wstring& value);
 
        const std::vector<std::wstring>& get_original() const
        {
@@ -94,20 +97,20 @@ public:
        }
 
        // Compatibility method
-       void push_back(std::wstring const& s)
+       void push_back(const std::wstring& s)
        {
                params_.push_back(s);
                params_original_.push_back(s);
        }
 
        // Compatibility method
-       std::wstring const& at(int i) const
+       const std::wstring& at(int i) const
        {
                return params_.at(i);
        }
 
        // Compatibility method
-       std::wstring const& back() const
+       const std::wstring& back() const
        {
                return params_.back();
        }
@@ -119,7 +122,7 @@ public:
        }
 
        // Compatibility method
-       std::wstring const& operator [] (size_t i) const
+       const std::wstring& operator [] (size_t i) const
        {
                return params_[i];
        }
index 33ca9698394c9e3e3e5677b8dd12be62f00e8c69..3898f8d3871ca3a9e75f8b43ae53080269332299 100644 (file)
 #include <boost/format.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 
+#pragma warning(push)
+#pragma warning(disable: 4244)
+#pragma warning(disable: 4245)
+#include <boost/crc.hpp>
+#pragma warning(pop)
+
 #include <tbb/atomic.h>
 #include <tbb/concurrent_queue.h>
 #include <tbb/parallel_invoke.h>
@@ -55,7 +61,16 @@ extern "C"
 using namespace Concurrency;
 
 namespace caspar { namespace ffmpeg {
-       
+
+int crc16(const std::string& str)
+{
+       boost::crc_16_type result;
+
+       result.process_bytes(str.data(), str.length());
+
+       return result.checksum();
+}
+
 class streaming_consumer sealed : public core::frame_consumer
 {
 public:
@@ -64,6 +79,7 @@ public:
 private:
        
        boost::filesystem::path                                         path_;
+       int                                                                                     consumer_index_offset_;
 
        std::map<std::string, std::string>                      options_;
                                                                                                
@@ -105,7 +121,8 @@ public:
        streaming_consumer(
                std::string path, 
                std::string options)
-               : path_(std::move(path))
+               : path_(path)
+               , consumer_index_offset_(crc16(path))
                , video_pts_(0)
                , audio_pts_(0)
                , executor_(print())
@@ -413,7 +430,7 @@ public:
 
        int index() const override
        {
-               return 200;
+               return 100000 + consumer_index_offset_;
        }
 
        int64_t presentation_frame_age_millis() const override
index 0c3126c9a5b9871f290254c4387955545318cb16..4d17fc232927b95926e3ca9be83817ed14cb5142 100644 (file)
@@ -839,6 +839,10 @@ bool AddCommand::DoExecute()
        //Perform loading of the clip\r
        try\r
        {\r
+               _parameters.replace_placeholders(\r
+                               L"<CLIENT_IP_ADDRESS>",\r
+                               this->GetClientInfo()->print());\r
+\r
                auto consumer = create_consumer(_parameters);\r
                GetChannel()->output()->add(GetLayerIndex(consumer->index()), consumer);\r
        \r
@@ -866,8 +870,15 @@ bool RemoveCommand::DoExecute()
        try\r
        {\r
                auto index = GetLayerIndex(std::numeric_limits<int>::min());\r
-               if(index == std::numeric_limits<int>::min())\r
+\r
+               if (index == std::numeric_limits<int>::min())\r
+               {\r
+                       _parameters.replace_placeholders(\r
+                                       L"<CLIENT_IP_ADDRESS>",\r
+                                       this->GetClientInfo()->print());\r
+\r
                        index = create_consumer(_parameters)->index();\r
+               }\r
 \r
                GetChannel()->output()->remove(index);\r
 \r