]> git.sesse.net Git - casparcg/commitdiff
* refactoring of AMCPProtocolStrategy::Parse and the AMCPCommands
authorniklaspandersson <niklas.p.andersson@svt.se>
Mon, 12 Aug 2013 16:01:35 +0000 (18:01 +0200)
committerniklaspandersson <niklas.p.andersson@svt.se>
Mon, 12 Aug 2013 16:01:35 +0000 (18:01 +0200)
* implemented LOCK-command to protect a channel from accidental access

19 files changed:
protocol/amcp/AMCPCommand.h
protocol/amcp/AMCPCommandQueue.h
protocol/amcp/AMCPCommandsImpl.cpp
protocol/amcp/AMCPCommandsImpl.h
protocol/amcp/AMCPProtocolStrategy.cpp
protocol/amcp/AMCPProtocolStrategy.h
protocol/cii/CIIProtocolStrategy.cpp
protocol/cii/CIIProtocolStrategy.h
protocol/protocol.vcxproj
protocol/protocol.vcxproj.filters
protocol/util/AsyncEventServer.cpp
protocol/util/AsyncEventServer.h
protocol/util/ClientInfo.h
protocol/util/ProtocolStrategy.h
protocol/util/protocol_strategy.h
protocol/util/strategy_adapters.cpp
protocol/util/strategy_adapters.h
shell/main.cpp
shell/server.cpp

index 747bcb5efe4597d6cf82f86ac1c6b24e8bcc5ca2..b73cc85c7e46b869ffff85dbee57f00d91bea7ec 100644 (file)
@@ -22,9 +22,8 @@
 #pragma once
 
 #include "../util/clientinfo.h"
-
+#include "amcp_shared.h"
 #include <core/consumer/frame_consumer.h>
-#include <core/video_channel.h>
 
 #include <boost/algorithm/string.hpp>
 
@@ -33,72 +32,99 @@ namespace amcp {
        
        class AMCPCommand
        {
-               AMCPCommand(const AMCPCommand&);
                AMCPCommand& operator=(const AMCPCommand&);
+       protected:
+               AMCPCommand(const AMCPCommand& rhs) : client_(rhs.client_), parameters_(rhs.parameters_)
+               {}
 
        public:
                typedef std::shared_ptr<AMCPCommand> ptr_type;
 
-               AMCPCommand();
+               explicit AMCPCommand(IO::ClientInfoPtr client) : client_(client) {}
                virtual ~AMCPCommand() {}
-               virtual bool Execute() = 0;
 
-               virtual bool NeedChannel() = 0;
-               virtual int GetMinimumParameters() = 0;
+               virtual bool Execute() = 0;
+               virtual int minimum_parameters() = 0;
 
                void SendReply();
 
-               void AddParameter(const std::wstring& param){ parameters_.push_back(param);}
-
-               void SetClientInfo(IO::ClientInfoPtr& s){pClientInfo_ = s;}
-               IO::ClientInfoPtr GetClientInfo(){return pClientInfo_;}
-
-               void SetChannel(const std::shared_ptr<core::video_channel>& pChannel){pChannel_ = pChannel;}
-               std::shared_ptr<core::video_channel> GetChannel(){return pChannel_;}
-
-               void SetChannels(const std::vector<spl::shared_ptr<core::video_channel>>& channels){channels_ = channels;}
-               const std::vector<spl::shared_ptr<core::video_channel>>& GetChannels() { return channels_; }
-
-               void SetChannelIndex(unsigned int channelIndex){channelIndex_ = channelIndex;}
-               unsigned int GetChannelIndex(){return channelIndex_;}
-
-               void SetLayerIntex(int layerIndex){layerIndex_ = layerIndex;}
-               int GetLayerIndex(int defaultValue = 0) const{return layerIndex_ != -1 ? layerIndex_ : defaultValue;}
+               std::vector<std::wstring>& parameters() { return parameters_; }
 
-               virtual void Clear();
+               IO::ClientInfoPtr client() { return client_; }
 
                virtual std::wstring print() const = 0;
 
                void SetReplyString(const std::wstring& str){replyString_ = str;}
 
-       protected:
-               std::vector<std::wstring>& parameters() { return parameters_; }
-
        private:
                std::vector<std::wstring> parameters_;
-               unsigned int channelIndex_;
-               int layerIndex_;
-               IO::ClientInfoPtr pClientInfo_;
-               std::shared_ptr<core::video_channel> pChannel_;
-               std::vector<spl::shared_ptr<core::video_channel>> channels_;
+               IO::ClientInfoPtr client_;
                std::wstring replyString_;
        };
 
-       template<bool TNeedChannel,int TMinParameters>
+       template<int TMinParameters>
        class AMCPCommandBase : public AMCPCommand
        {
+       protected:
+               explicit AMCPCommandBase(IO::ClientInfoPtr client) : AMCPCommand(client) {}
+               AMCPCommandBase(const AMCPCommandBase& rhs) : AMCPCommand(rhs) {}
+               template<int T>
+               AMCPCommandBase(const AMCPCommandBase<T>& rhs) : AMCPCommand(rhs) {}
+
+               ~AMCPCommandBase(){}
        public:
                virtual bool Execute()
                {
-                       return (TNeedChannel && !GetChannel()) || parameters().size() < TMinParameters ? false : DoExecute();
+                       return (parameters().size() < TMinParameters) ? false : DoExecute();
                }
-
-               virtual bool NeedChannel(){return TNeedChannel;}                
-               virtual int GetMinimumParameters(){return TMinParameters;}
-       protected:
-               ~AMCPCommandBase(){}
+               virtual int minimum_parameters(){return TMinParameters;}
        private:
                virtual bool DoExecute() = 0;
        };      
 
+       class AMCPChannelCommand
+       {
+       protected:
+               AMCPChannelCommand(const channel_context& ctx, unsigned int channel_index, int layer_index) : ctx_(ctx), channel_index_(channel_index), layer_index_(layer_index)
+               {}
+               AMCPChannelCommand(const AMCPChannelCommand& rhs) : ctx_(rhs.ctx_), channel_index_(rhs.channel_index_), layer_index_(rhs.layer_index_)
+               {}
+
+               spl::shared_ptr<core::video_channel>& channel() { return ctx_.channel; }
+               spl::shared_ptr<IO::lock_container>& lock_container() { return ctx_.lock; }
+
+               unsigned int channel_index(){return channel_index_;}
+               int layer_index(int default = 0) const{return layer_index_ != -1 ? layer_index_ : default; }
+
+       private:
+               unsigned int channel_index_;
+               int layer_index_;
+               channel_context ctx_;
+       };
+
+       class AMCPChannelsAwareCommand
+       {
+       protected:
+               AMCPChannelsAwareCommand(const std::vector<channel_context>& c) : channels_(c) {}
+               AMCPChannelsAwareCommand(const AMCPChannelsAwareCommand& rhs) : channels_(rhs.channels_) {}
+
+               const std::vector<channel_context>& channels() { return channels_; }
+
+       private:
+               const std::vector<channel_context>& channels_;
+       };
+
+       template<int TMinParameters>
+       class AMCPChannelCommandBase : public AMCPChannelCommand, public AMCPCommandBase<TMinParameters>
+       {
+       public:
+               AMCPChannelCommandBase(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommand(channel, channel_index, layer_index), AMCPCommandBase(client)
+               {}
+       protected:
+               AMCPChannelCommandBase(const AMCPChannelCommandBase& rhs) : AMCPChannelCommand(rhs), AMCPCommandBase(rhs)
+               {}
+               template<int T>
+               AMCPChannelCommandBase(const AMCPChannelCommandBase<T>& rhs) : AMCPChannelCommand(rhs), AMCPCommandBase(rhs)
+               {}
+       };
 }}}
index ced56a31cbb240ed723cbf4429f471f554faa902..e4d0eaed7adc630411ad2acc1abb72e68de733a8 100644 (file)
@@ -34,6 +34,8 @@ class AMCPCommandQueue
        AMCPCommandQueue(const AMCPCommandQueue&);
        AMCPCommandQueue& operator=(const AMCPCommandQueue&);
 public:
+       typedef std::shared_ptr<AMCPCommandQueue> ptr_type;
+
        AMCPCommandQueue();
        ~AMCPCommandQueue();
 
@@ -42,6 +44,5 @@ public:
 private:
        executor                        executor_;
 };
-typedef std::tr1::shared_ptr<AMCPCommandQueue> AMCPCommandQueuePtr;
 
 }}}
index 45575efdf9181f7ea4f1f06acd9bab53bc651b78..38f89f7f5e56e2b834fcabeda15112e4e1ca40cb 100644 (file)
@@ -96,6 +96,7 @@
 500 FAILED                             Internt configurationfel  
 501 [kommando] FAILED  Internt configurationfel  
 502 [kommando] FAILED  Oläslig mediafil  
+503 [kommando] FAILED  access denied
 
 600 [kommando] FAILED  funktion ej implementerad
 */
@@ -262,26 +263,12 @@ std::wstring ListTemplates()
 
 namespace amcp {
        
-AMCPCommand::AMCPCommand() : channelIndex_(0), layerIndex_(-1)
-{}
-
 void AMCPCommand::SendReply()
 {
-       if(!pClientInfo_) 
-               return;
-
        if(replyString_.empty())
                return;
 
-       pClientInfo_->Send(replyString_);
-}
-
-void AMCPCommand::Clear() 
-{
-       pChannel_->stage().clear();
-       pClientInfo_.reset();
-       channelIndex_ = 0;
-       parameters_.clear();
+       client_->send(std::move(replyString_));
 }
 
 bool DiagnosticsCommand::DoExecute()
@@ -307,7 +294,7 @@ bool ChannelGridCommand::DoExecute()
        CASPAR_THROW_EXCEPTION(not_implemented());
 
        //int index = 1;
-       //auto self = GetChannels().back();
+       //auto self = channels().back();
        //
        //std::vector<std::wstring> params;
        //params.push_back(L"SCREEN");
@@ -317,7 +304,7 @@ bool ChannelGridCommand::DoExecute()
 
        //self->output().add(screen);
 
-       //BOOST_FOREACH(auto channel, GetChannels())
+       //BOOST_FOREACH(auto channel, channels())
        //{
        //      if(channel != self)
        //      {
@@ -328,7 +315,7 @@ bool ChannelGridCommand::DoExecute()
        //      }
        //}
 
-       //int n = GetChannels().size()-1;
+       //int n = channels().size()-1;
        //double delta = 1.0/static_cast<double>(n);
        //for(int x = 0; x < n; ++x)
        //{
@@ -359,7 +346,7 @@ bool CallCommand::DoExecute()
        //Perform loading of the clip
        try
        {
-               auto result = GetChannel()->stage().call(GetLayerIndex(), parameters());
+               auto result = channel()->stage().call(layer_index(), parameters());
                
                if(!result.timed_wait(boost::posix_time::seconds(2)))
                        CASPAR_THROW_EXCEPTION(timed_out());
@@ -398,7 +385,7 @@ bool MixerCommand::DoExecute()
                if(boost::iequals(parameters()[0], L"KEYER") || boost::iequals(parameters()[0], L"IS_KEY"))
                {
                        bool value = boost::lexical_cast<int>(parameters().at(1));
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.is_key = value;
                                return transform;                                       
@@ -411,7 +398,7 @@ bool MixerCommand::DoExecute()
 
                        double value = boost::lexical_cast<double>(parameters().at(1));
                        
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.opacity = value;
                                return transform;                                       
@@ -426,7 +413,7 @@ bool MixerCommand::DoExecute()
                        double x_s      = boost::lexical_cast<double>(parameters().at(3));
                        double y_s      = boost::lexical_cast<double>(parameters().at(4));
 
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) mutable -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) mutable -> frame_transform
                        {
                                transform.image_transform.fill_translation[0]   = x;
                                transform.image_transform.fill_translation[1]   = y;
@@ -444,7 +431,7 @@ bool MixerCommand::DoExecute()
                        double x_s      = boost::lexical_cast<double>(parameters().at(3));
                        double y_s      = boost::lexical_cast<double>(parameters().at(4));
 
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.clip_translation[0]   = x;
                                transform.image_transform.clip_translation[1]   = y;
@@ -482,20 +469,20 @@ bool MixerCommand::DoExecute()
                else if(boost::iequals(parameters()[0], L"BLEND"))
                {
                        auto blend_str = parameters().at(1);                                                            
-                       int layer = GetLayerIndex();
-                       GetChannel()->mixer().set_blend_mode(GetLayerIndex(), get_blend_mode(blend_str));       
+                       int layer = layer_index();
+                       channel()->mixer().set_blend_mode(layer_index(), get_blend_mode(blend_str));    
                }
                else if(boost::iequals(parameters()[0], L"MASTERVOLUME"))
                {
                        float master_volume = boost::lexical_cast<float>(parameters().at(1));
-                       GetChannel()->mixer().set_master_volume(master_volume);
+                       channel()->mixer().set_master_volume(master_volume);
                }
                else if(boost::iequals(parameters()[0], L"BRIGHTNESS"))
                {
                        auto value = boost::lexical_cast<double>(parameters().at(1));
                        int duration = parameters().size() > 2 ? boost::lexical_cast<int>(parameters()[2]) : 0;
                        std::wstring tween = parameters().size() > 3 ? parameters()[3] : L"linear";
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.brightness = value;
                                return transform;
@@ -506,7 +493,7 @@ bool MixerCommand::DoExecute()
                        auto value = boost::lexical_cast<double>(parameters().at(1));
                        int duration = parameters().size() > 2 ? boost::lexical_cast<int>(parameters()[2]) : 0;
                        std::wstring tween = parameters().size() > 3 ? parameters()[3] : L"linear";
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.saturation = value;
                                return transform;
@@ -517,7 +504,7 @@ bool MixerCommand::DoExecute()
                        auto value = boost::lexical_cast<double>(parameters().at(1));
                        int duration = parameters().size() > 2 ? boost::lexical_cast<int>(parameters()[2]) : 0;
                        std::wstring tween = parameters().size() > 3 ? parameters()[3] : L"linear";
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.contrast = value;
                                return transform;
@@ -534,7 +521,7 @@ bool MixerCommand::DoExecute()
                        int duration = parameters().size() > 6 ? boost::lexical_cast<int>(parameters()[6]) : 0;
                        std::wstring tween = parameters().size() > 7 ? parameters()[7] : L"linear";
 
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.image_transform.levels = value;
                                return transform;
@@ -546,7 +533,7 @@ bool MixerCommand::DoExecute()
                        std::wstring tween = parameters().size() > 3 ? parameters()[3] : L"linear";
                        double value = boost::lexical_cast<double>(parameters()[1]);
 
-                       transforms.push_back(stage::transform_tuple_t(GetLayerIndex(), [=](frame_transform transform) -> frame_transform
+                       transforms.push_back(stage::transform_tuple_t(layer_index(), [=](frame_transform transform) -> frame_transform
                        {
                                transform.audio_transform.volume = value;
                                return transform;
@@ -554,22 +541,22 @@ bool MixerCommand::DoExecute()
                }
                else if(boost::iequals(parameters()[0], L"CLEAR"))
                {
-                       int layer = GetLayerIndex(std::numeric_limits<int>::max());
+                       int layer = layer_index(std::numeric_limits<int>::max());
 
                        if (layer == std::numeric_limits<int>::max())
                        {
-                               GetChannel()->stage().clear_transforms();
-                               GetChannel()->mixer().clear_blend_modes();
+                               channel()->stage().clear_transforms();
+                               channel()->mixer().clear_blend_modes();
                        }
                        else
                        {
-                               GetChannel()->stage().clear_transforms(layer);
-                               GetChannel()->mixer().clear_blend_mode(layer);
+                               channel()->stage().clear_transforms(layer);
+                               channel()->mixer().clear_blend_mode(layer);
                        }
                }
                else if(boost::iequals(parameters()[0], L"COMMIT"))
                {
-                       transforms = std::move(deferred_transforms[GetChannelIndex()]);
+                       transforms = std::move(deferred_transforms[channel_index()]);
                }
                else
                {
@@ -579,11 +566,11 @@ bool MixerCommand::DoExecute()
 
                if(defer)
                {
-                       auto& defer_tranforms = deferred_transforms[GetChannelIndex()];
+                       auto& defer_tranforms = deferred_transforms[channel_index()];
                        defer_tranforms.insert(defer_tranforms.end(), transforms.begin(), transforms.end());
                }
                else
-                       GetChannel()->stage().apply_transforms(transforms);
+                       channel()->stage().apply_transforms(transforms);
        
                SetReplyString(TEXT("202 MIXER OK\r\n"));
 
@@ -608,24 +595,24 @@ bool SwapCommand::DoExecute()
        //Perform loading of the clip
        try
        {
-               if(GetLayerIndex(-1) != -1)
+               if(layer_index(-1) != -1)
                {
                        std::vector<std::string> strs;
                        boost::split(strs, parameters()[0], boost::is_any_of("-"));
                        
-                       auto ch1 = GetChannel();
-                       auto ch2 = GetChannels().at(boost::lexical_cast<int>(strs.at(0))-1);
+                       auto ch1 = channel();
+                       auto ch2 = channels().at(boost::lexical_cast<int>(strs.at(0))-1);
 
-                       int l1 = GetLayerIndex();
+                       int l1 = layer_index();
                        int l2 = boost::lexical_cast<int>(strs.at(1));
 
-                       ch1->stage().swap_layer(l1, l2, ch2->stage());
+                       ch1->stage().swap_layer(l1, l2, ch2.channel->stage());
                }
                else
                {
-                       auto ch1 = GetChannel();
-                       auto ch2 = GetChannels().at(boost::lexical_cast<int>(parameters()[0])-1);
-                       ch1->stage().swap_layers(ch2->stage());
+                       auto ch1 = channel();
+                       auto ch2 = channels().at(boost::lexical_cast<int>(parameters()[0])-1);
+                       ch1->stage().swap_layers(ch2.channel->stage());
                }
                
                SetReplyString(TEXT("202 SWAP OK\r\n"));
@@ -658,7 +645,7 @@ bool AddCommand::DoExecute()
                }
 
                auto consumer = create_consumer(parameters());
-               GetChannel()->output().add(GetLayerIndex(consumer->index()), consumer);
+               channel()->output().add(layer_index(consumer->index()), consumer);
        
                SetReplyString(TEXT("202 ADD OK\r\n"));
 
@@ -683,7 +670,7 @@ bool RemoveCommand::DoExecute()
        //Perform loading of the clip
        try
        {
-               auto index = GetLayerIndex(std::numeric_limits<int>::min());
+               auto index = layer_index(std::numeric_limits<int>::min());
                if(index == std::numeric_limits<int>::min())
                {
                        //create_consumer still expects all parameters to be uppercase
@@ -695,7 +682,7 @@ bool RemoveCommand::DoExecute()
                        index = create_consumer(parameters())->index();
                }
 
-               GetChannel()->output().remove(index);
+               channel()->output().remove(index);
 
                SetReplyString(TEXT("202 REMOVE OK\r\n"));
 
@@ -720,8 +707,8 @@ bool LoadCommand::DoExecute()
        //Perform loading of the clip
        try
        {
-               auto pFP = create_producer(GetChannel()->frame_factory(), GetChannel()->video_format_desc(), parameters());             
-               GetChannel()->stage().load(GetLayerIndex(), pFP, true);
+               auto pFP = create_producer(channel()->frame_factory(), channel()->video_format_desc(), parameters());           
+               channel()->stage().load(layer_index(), pFP, true);
        
                SetReplyString(TEXT("202 LOAD OK\r\n"));
 
@@ -833,11 +820,11 @@ bool LoadbgCommand::DoExecute()
                if(boost::regex_match(parameters().at(0), what, expr))
                {
                        auto channel_index = boost::lexical_cast<int>(what["CHANNEL"].str());
-                       pFP = reroute::create_producer(*GetChannels().at(channel_index-1)); 
+                       pFP = reroute::create_producer(*channels().at(channel_index-1).channel); 
                }
                else
                {
-                       pFP = create_producer(GetChannel()->frame_factory(), GetChannel()->video_format_desc(), parameters());
+                       pFP = create_producer(channel()->frame_factory(), channel()->video_format_desc(), parameters());
                }
                
                if(pFP == frame_producer::empty())
@@ -845,11 +832,11 @@ bool LoadbgCommand::DoExecute()
 
                bool auto_play = contains_param(L"AUTO", parameters());
 
-               auto pFP2 = create_transition_producer(GetChannel()->video_format_desc().field_mode, spl::make_shared_ptr(pFP), transitionInfo);
+               auto pFP2 = create_transition_producer(channel()->video_format_desc().field_mode, spl::make_shared_ptr(pFP), transitionInfo);
                if(auto_play)
-                       GetChannel()->stage().load(GetLayerIndex(), pFP2, false, transitionInfo.duration); // TODO: LOOP
+                       channel()->stage().load(layer_index(), pFP2, false, transitionInfo.duration); // TODO: LOOP
                else
-                       GetChannel()->stage().load(GetLayerIndex(), pFP2, false); // TODO: LOOP
+                       channel()->stage().load(layer_index(), pFP2, false); // TODO: LOOP
        
                
                SetReplyString(TEXT("202 LOADBG OK\r\n"));
@@ -874,7 +861,7 @@ bool PauseCommand::DoExecute()
 {
        try
        {
-               GetChannel()->stage().pause(GetLayerIndex());
+               channel()->stage().pause(layer_index());
                SetReplyString(TEXT("202 PAUSE OK\r\n"));
                return true;
        }
@@ -892,19 +879,13 @@ bool PlayCommand::DoExecute()
        {
                if(!parameters().empty())
                {
-                       LoadbgCommand lbg;
-                       lbg.SetChannel(GetChannel());
-                       lbg.SetChannels(GetChannels());
-                       lbg.SetChannelIndex(GetChannelIndex());
-                       lbg.SetLayerIntex(GetLayerIndex());
-                       lbg.SetClientInfo(GetClientInfo());
-                       for(auto it = parameters().begin(); it != parameters().end(); ++it)
-                               lbg.AddParameter(*it);
+                       LoadbgCommand lbg(*this);
+
                        if(!lbg.Execute())
                                throw std::exception();
                }
 
-               GetChannel()->stage().play(GetLayerIndex());
+               channel()->stage().play(layer_index());
                
                SetReplyString(TEXT("202 PLAY OK\r\n"));
                return true;
@@ -921,7 +902,7 @@ bool StopCommand::DoExecute()
 {
        try
        {
-               GetChannel()->stage().stop(GetLayerIndex());
+               channel()->stage().stop(layer_index());
                SetReplyString(TEXT("202 STOP OK\r\n"));
                return true;
        }
@@ -935,11 +916,11 @@ bool StopCommand::DoExecute()
 
 bool ClearCommand::DoExecute()
 {
-       int index = GetLayerIndex(std::numeric_limits<int>::min());
+       int index = layer_index(std::numeric_limits<int>::min());
        if(index != std::numeric_limits<int>::min())
-               GetChannel()->stage().clear(index);
+               channel()->stage().clear(index);
        else
-               GetChannel()->stage().clear();
+               channel()->stage().clear();
                
        SetReplyString(TEXT("202 CLEAR OK\r\n"));
 
@@ -948,7 +929,7 @@ bool ClearCommand::DoExecute()
 
 bool PrintCommand::DoExecute()
 {
-       GetChannel()->output().add(create_consumer(boost::assign::list_of(L"IMAGE")));
+       channel()->output().add(create_consumer(boost::assign::list_of(L"IMAGE")));
                
        SetReplyString(TEXT("202 PRINT OK\r\n"));
 
@@ -1083,7 +1064,7 @@ bool CGCommand::DoExecuteAdd() {
                std::wstring filename = parameters()[2];
                filename.append(extension);
 
-               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).add(layer, filename, bDoStart, label, (pDataString!=0) ? pDataString : TEXT(""));
+               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).add(layer, filename, bDoStart, label, (pDataString!=0) ? pDataString : TEXT(""));
                SetReplyString(TEXT("202 CG OK\r\n"));
        }
        else
@@ -1104,7 +1085,7 @@ bool CGCommand::DoExecutePlay()
                        return false;
                }
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).play(layer);
+               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).play(layer);
        }
        else
        {
@@ -1126,7 +1107,7 @@ bool CGCommand::DoExecuteStop()
                        return false;
                }
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).stop(layer, 0);
+               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).stop(layer, 0);
        }
        else 
        {
@@ -1149,7 +1130,7 @@ bool CGCommand::DoExecuteNext()
                }
 
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).next(layer);
+               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).next(layer);
        }
        else 
        {
@@ -1172,7 +1153,7 @@ bool CGCommand::DoExecuteRemove()
                }
 
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).remove(layer);
+               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).remove(layer);
        }
        else 
        {
@@ -1186,7 +1167,7 @@ bool CGCommand::DoExecuteRemove()
 
 bool CGCommand::DoExecuteClear() 
 {
-       GetChannel()->stage().clear(GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER));
+       channel()->stage().clear(layer_index(flash::cg_proxy::DEFAULT_LAYER));
        SetReplyString(TEXT("202 CG OK\r\n"));
        return true;
 }
@@ -1213,7 +1194,7 @@ bool CGCommand::DoExecuteUpdate()
                }               
 
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).update(layer, dataString);
+               flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).update(layer, dataString);
        }
        catch(...)
        {
@@ -1238,7 +1219,7 @@ bool CGCommand::DoExecuteInvoke()
                        return false;
                }
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               auto result = flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).invoke(layer, parameters()[2]);
+               auto result = flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).invoke(layer, parameters()[2]);
                replyString << result << TEXT("\r\n"); 
        }
        else 
@@ -1265,13 +1246,13 @@ bool CGCommand::DoExecuteInfo()
                }
 
                int layer = boost::lexical_cast<int>(parameters()[1]);
-               auto desc = flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).description(layer);
+               auto desc = flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).description(layer);
                
                replyString << desc << TEXT("\r\n"); 
        }
        else 
        {
-               auto info = flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(GetChannel()), GetLayerIndex(flash::cg_proxy::DEFAULT_LAYER)).template_host_info();
+               auto info = flash::create_cg_proxy(spl::shared_ptr<core::video_channel>(channel()), layer_index(flash::cg_proxy::DEFAULT_LAYER)).template_host_info();
                replyString << info << TEXT("\r\n"); 
        }       
 
@@ -1526,8 +1507,8 @@ bool InfoCommand::DoExecute()
                        boost::property_tree::wptree info;
 
                        int index = 0;
-                       BOOST_FOREACH(auto channel, channels_)
-                               info.add_child(L"channels.channel", channel->info())
+                       BOOST_FOREACH(auto channel, channels())
+                               info.add_child(L"channels.channel", channel.channel->info())
                                        .add(L"index", ++index);
                        
                        boost::property_tree::write_xml(replyString, info, w);
@@ -1550,7 +1531,7 @@ bool InfoCommand::DoExecute()
                                
                                if(layer == std::numeric_limits<int>::min())
                                {       
-                                       info.add_child(L"channel", channels_.at(channel)->info())
+                                       info.add_child(L"channel", channels().at(channel).channel->info())
                                                        .add(L"index", channel);
                                }
                                else
@@ -1558,13 +1539,13 @@ bool InfoCommand::DoExecute()
                                        if(parameters().size() >= 2)
                                        {
                                                if(boost::iequals(parameters()[1], L"B"))
-                                                       info.add_child(L"producer", channels_.at(channel)->stage().background(layer).get()->info());
+                                                       info.add_child(L"producer", channels().at(channel).channel->stage().background(layer).get()->info());
                                                else
-                                                       info.add_child(L"producer", channels_.at(channel)->stage().foreground(layer).get()->info());
+                                                       info.add_child(L"producer", channels().at(channel).channel->stage().foreground(layer).get()->info());
                                        }
                                        else
                                        {
-                                               info.add_child(L"layer", channels_.at(channel)->stage().info(layer).get())
+                                               info.add_child(L"layer", channels().at(channel).channel->stage().info(layer).get())
                                                        .add(L"index", layer);
                                        }
                                }
@@ -1574,8 +1555,8 @@ bool InfoCommand::DoExecute()
                        {
                                // This is needed for backwards compatibility with old clients
                                replyString << TEXT("200 INFO OK\r\n");
-                               for(size_t n = 0; n < channels_.size(); ++n)
-                                       GenerateChannelInfo(n, channels_[n], replyString);
+                               for(size_t n = 0; n < channels().size(); ++n)
+                                       GenerateChannelInfo(n, channels()[n].channel, replyString);
                        }
 
                }
@@ -1661,7 +1642,7 @@ bool VersionCommand::DoExecute()
 
 bool ByeCommand::DoExecute()
 {
-       GetClientInfo()->Disconnect();
+       client()->disconnect();
        return true;
 }
 
@@ -1677,7 +1658,7 @@ bool SetCommand::DoExecute()
                        auto format_desc = core::video_format_desc(value);
                        if(format_desc.format != core::video_format::invalid)
                        {
-                               GetChannel()->video_format_desc(format_desc);
+                               channel()->video_format_desc(format_desc);
                                SetReplyString(TEXT("202 SET MODE OK\r\n"));
                        }
                        else
@@ -1698,6 +1679,108 @@ bool SetCommand::DoExecute()
        return true;
 }
 
+bool LockCommand::DoExecute()
+{
+       try
+       {
+               auto it = parameters().begin();
+
+               std::shared_ptr<caspar::IO::lock_container> lock;
+               try
+               {
+                       int channel_index = boost::lexical_cast<int>(*it) - 1;
+                       lock = channels().at(channel_index).lock;
+               }
+               catch(const boost::bad_lexical_cast&) {}
+               catch(...)
+               {
+                       SetReplyString(L"401 LOCK ERROR\r\n");
+                       return false;
+               }
+
+               if(lock)
+                       ++it;
+
+               if(it == parameters().end())    //too few parameters
+               {
+                       SetReplyString(L"402 LOCK ERROR\r\n");
+                       return false;
+               }
+
+               std::wstring command = boost::to_upper_copy(*it);
+               if(command == L"ACQUIRE")
+               {
+                       ++it;
+                       if(it == parameters().end())    //too few parameters
+                       {
+                               SetReplyString(L"402 LOCK ACQUIRE ERROR\r\n");
+                               return false;
+                       }
+                       std::wstring lock_phrase = (*it);
+
+                       //TODO: read options
+
+                       if(lock)
+                       {
+                               //just lock one channel
+                               if(!lock->try_lock(lock_phrase, client()))
+                               {
+                                       SetReplyString(L"503 LOCK ACQUIRE FAILED\r\n");
+                                       return false;
+                               }
+                       }
+                       else
+                       {
+                               //TODO: lock all channels
+                               CASPAR_THROW_EXCEPTION(not_implemented());
+                       }
+                       SetReplyString(L"202 LOCK ACQUIRE OK\r\n");
+
+               }
+               else if(command == L"RELEASE")
+               {
+                       if(lock)
+                       {
+                               client()->remove_lifecycle_bound_object(lock->lifecycle_key());
+                       }
+                       else
+                       {
+                               //TODO: lock all channels
+                               CASPAR_THROW_EXCEPTION(not_implemented());
+                       }
+                       SetReplyString(L"202 LOCK RELEASE OK\r\n");
+               }
+               else if(command == L"CLEAR")
+               {
+                       ++it;
+                       if(it == parameters().end())
+                       {
+                               SetReplyString(L"402 LOCK CLEAR ERROR\r\n");
+                               return false;
+                       }
+                       std::wstring override_phrase = (*it);
+
+                       //TODO: clear locks
+
+
+                       SetReplyString(L"202 LOCK CLEAR OK\r\n");
+               }
+               else
+               {
+                       SetReplyString(L"403 LOCK ERROR\r\n");
+                       return false;
+               }
+       }
+       catch(...)
+       {
+               CASPAR_LOG_CURRENT_EXCEPTION();
+               SetReplyString(L"501 LOCK FAILED\r\n");
+               return false;
+       }
+
+       return true;
+}
+
 
 }      //namespace amcp
 }}     //namespace caspar
\ No newline at end of file
index 87b29640816307b0b62cf12be70af945ff8b6bdf..f01b8c1b3ceddd3fd195a89d72fa0ffd282fb19a 100644 (file)
@@ -32,98 +32,171 @@ std::wstring ListTemplates();
 
 namespace amcp {
        
-class ChannelGridCommand : public AMCPCommandBase<false, 0>
+class ChannelGridCommand : public AMCPCommandBase<0>
 {
+public:
+       explicit ChannelGridCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"ChannelGridCommand";}
        bool DoExecute();
 };
 
-class DiagnosticsCommand : public AMCPCommandBase<false, 0>
+class DiagnosticsCommand : public AMCPCommandBase<0>
 {
+public:
+       explicit DiagnosticsCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"DiagnosticsCommand";}
        bool DoExecute();
 };
 
-class CallCommand : public AMCPCommandBase<true, 1>
+class CallCommand : public AMCPChannelCommandBase<1>
 {
+public:
+       CallCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"CallCommand";}
        bool DoExecute();
 };
 
-class MixerCommand : public AMCPCommandBase<true, 1>
+class MixerCommand : public AMCPChannelCommandBase<1>
 {
+public:
+       MixerCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"MixerCommand";}
        bool DoExecute();
 };
        
-class AddCommand : public AMCPCommandBase<true, 1>
+class AddCommand : public AMCPChannelCommandBase<1>
 {
+public:
+       AddCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"AddCommand";}
        bool DoExecute();
 };
 
-class RemoveCommand : public AMCPCommandBase<true, 0>
+class RemoveCommand : public AMCPChannelCommandBase<0>
 {
+public:
+       RemoveCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"RemoveCommand";}
        bool DoExecute();
 };
 
-class SwapCommand : public AMCPCommandBase<true, 1>
+class SwapCommand : public AMCPChannelCommandBase<1>, AMCPChannelsAwareCommand
 {
+public:
+       SwapCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index, const std::vector<channel_context>& channels) : AMCPChannelCommandBase(client, channel, channel_index, layer_index), AMCPChannelsAwareCommand(channels)
+       {}
+
+private:
        std::wstring print() const { return L"SwapCommand";}
        bool DoExecute();
 };
 
-class LoadCommand : public AMCPCommandBase<true, 1>
+class LoadCommand : public AMCPChannelCommandBase<1>
 {
+public:
+       LoadCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"LoadCommand";}
        bool DoExecute();
 };
 
-class LoadbgCommand : public AMCPCommandBase<true, 1>
+
+class PlayCommand: public AMCPChannelCommandBase<0>, public AMCPChannelsAwareCommand
 {
-       std::wstring print() const { return L"LoadbgCommand";}
+public:
+       PlayCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index, const std::vector<channel_context>& channels) : AMCPChannelCommandBase(client, channel, channel_index, layer_index), AMCPChannelsAwareCommand(channels)
+       {}
+
+private:
+       std::wstring print() const { return L"PlayCommand";}
        bool DoExecute();
 };
 
-class PlayCommand: public AMCPCommandBase<true, 0>
+class LoadbgCommand : public AMCPChannelCommandBase<1>, public AMCPChannelsAwareCommand
 {
-       std::wstring print() const { return L"PlayCommand";}
+public:
+       LoadbgCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index, const std::vector<channel_context>& channels) : AMCPChannelCommandBase(client, channel, channel_index, layer_index), AMCPChannelsAwareCommand(channels)
+       {}
+       LoadbgCommand(const PlayCommand& rhs) : AMCPChannelCommandBase<1>(rhs), AMCPChannelsAwareCommand(rhs) {}
+
+private:
+       std::wstring print() const { return L"LoadbgCommand";}
        bool DoExecute();
 };
 
-class PauseCommand: public AMCPCommandBase<true, 0>
+class PauseCommand: public AMCPChannelCommandBase<0>
 {
+public:
+       PauseCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"PauseCommand";}
        bool DoExecute();
 };
 
-class StopCommand : public AMCPCommandBase<true, 0>
+class StopCommand : public AMCPChannelCommandBase<0>
 {
+public:
+       StopCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"StopCommand";}
        bool DoExecute();
 };
 
-class ClearCommand : public AMCPCommandBase<true, 0>
+class ClearCommand : public AMCPChannelCommandBase<0>
 {
+public:
+       ClearCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"ClearCommand";}
        bool DoExecute();
 };
 
-class PrintCommand : public AMCPCommandBase<true, 0>
+class PrintCommand : public AMCPChannelCommandBase<0>
 {
+public:
+       PrintCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"PrintCommand";}
        bool DoExecute();
 };
 
-class LogCommand : public AMCPCommandBase<false, 0>
+class LogCommand : public AMCPCommandBase<2>
 {
+public:
+       explicit LogCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"LogCommand";}
        bool DoExecute();
 };
 
-class CGCommand : public AMCPCommandBase<true, 1>
+class CGCommand : public AMCPChannelCommandBase<1>
 {
+public:
+       CGCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"CGCommand";}
        bool DoExecute();
        bool ValidateLayer(const std::wstring& layerstring);
@@ -139,8 +212,11 @@ class CGCommand : public AMCPCommandBase<true, 1>
        bool DoExecuteInfo();
 };
 
-class DataCommand : public AMCPCommandBase<false, 1>
+class DataCommand : public AMCPCommandBase<1>
 {
+public:
+       explicit DataCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
+
        std::wstring print() const { return L"DataCommand";}
        bool DoExecute();
        bool DoExecuteStore();
@@ -149,52 +225,73 @@ class DataCommand : public AMCPCommandBase<false, 1>
        bool DoExecuteList();
 };
 
-class ClsCommand : public AMCPCommandBase<false, 0>
+class ClsCommand : public AMCPCommandBase<0>
 {
+public:
+       explicit ClsCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"ClsCommand";}
        bool DoExecute();
 };
 
-class TlsCommand : public AMCPCommandBase<false, 0>
+class TlsCommand : public AMCPCommandBase<0>
 {
+public:
+       explicit TlsCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"TlsCommand";}
        bool DoExecute();
 };
 
-class CinfCommand : public AMCPCommandBase<false, 1>
+class CinfCommand : public AMCPCommandBase<1>
 {
+public:
+       explicit CinfCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"CinfCommand";}
        bool DoExecute();
 };
 
-class InfoCommand : public AMCPCommandBase<false, 0>
+class InfoCommand : public AMCPCommandBase<0>, AMCPChannelsAwareCommand
 {
 public:
+       InfoCommand(IO::ClientInfoPtr client, const std::vector<channel_context>& channels) : AMCPChannelsAwareCommand(channels), AMCPCommandBase(client) {}
        std::wstring print() const { return L"InfoCommand";}
-       InfoCommand(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : channels_(channels){}
        bool DoExecute();
-private:
-       const std::vector<spl::shared_ptr<core::video_channel>>& channels_;
 };
 
-class VersionCommand : public AMCPCommandBase<false, 0>
+class VersionCommand : public AMCPCommandBase<0>
 {
+public:
+       explicit VersionCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"VersionCommand";}
        bool DoExecute();
 };
 
-class ByeCommand : public AMCPCommandBase<false, 0>
+class ByeCommand : public AMCPCommandBase<0>
 {
+public:
+       explicit ByeCommand(IO::ClientInfoPtr client) : AMCPCommandBase(client) {}
        std::wstring print() const { return L"ByeCommand";}
        bool DoExecute();
 };
 
-class SetCommand : public AMCPCommandBase<true, 2>
+class SetCommand : public AMCPChannelCommandBase<2>
 {
+public:
+       SetCommand(IO::ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index) : AMCPChannelCommandBase(client, channel, channel_index, layer_index)
+       {}
+
+private:
        std::wstring print() const { return L"SetCommand";}
        bool DoExecute();
 };
 
+class LockCommand : public AMCPCommandBase<1>, AMCPChannelsAwareCommand
+{
+public:
+       LockCommand(IO::ClientInfoPtr client, const std::vector<channel_context>& channels) : AMCPChannelsAwareCommand(channels), AMCPCommandBase(client) {}
+       std::wstring print() const { return L"LockCommand";}
+       bool DoExecute();
+};
+
 //class KillCommand : public AMCPCommand
 //{
 //public:
index cf24372a181fab8e53c6df66df811cb2ad90c44b..524316820e9e972387fd10d2ea95075600437441 100644 (file)
 #include "../StdAfx.h"
 
 #include "AMCPProtocolStrategy.h"
-
-#include "../util/AsyncEventServer.h"
 #include "AMCPCommandsImpl.h"
+#include "amcp_shared.h"
+#include "AMCPCommand.h"
+#include "AMCPCommandQueue.h"
 
 #include <stdio.h>
 #include <crtdbg.h>
@@ -46,333 +47,346 @@ namespace caspar { namespace protocol { namespace amcp {
 
 using IO::ClientInfoPtr;
 
-const std::wstring AMCPProtocolStrategy::MessageDelimiter = TEXT("\r\n");
-
-inline std::shared_ptr<core::video_channel> GetChannelSafe(unsigned int index, const std::vector<spl::shared_ptr<core::video_channel>>& channels)
+struct AMCPProtocolStrategy::impl
 {
-       return index < channels.size() ? std::shared_ptr<core::video_channel>(channels[index]) : nullptr;
-}
-
-AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : channels_(channels) {
-       AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue());
-       commandQueues_.push_back(pGeneralCommandQueue);
+private:
+       std::vector<channel_context> channels_;
+       std::vector<AMCPCommandQueue::ptr_type> commandQueues_;
 
+public:
+       impl(const std::vector<spl::shared_ptr<core::video_channel>>& channels)
+       {
+               commandQueues_.push_back(std::make_shared<AMCPCommandQueue>());
 
-       std::shared_ptr<core::video_channel> pChannel;
-       unsigned int index = -1;
-       //Create a commandpump for each video_channel
-       while((pChannel = GetChannelSafe(++index, channels_)) != 0) {
-               AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue());
-               std::wstring title = TEXT("video_channel ");
+               int index = 0;
+               BOOST_FOREACH(const spl::shared_ptr<core::video_channel>& channel,  channels)
+               {
+                       std::wstring lifecycle_key = L"lock" + boost::lexical_cast<std::wstring>(index);
+                       channels_.push_back(channel_context(channel, lifecycle_key));
+                       auto queue(std::make_shared<AMCPCommandQueue>());
+                       commandQueues_.push_back(queue);
+                       ++index;
+               }
+       }
 
-               //HACK: Perform real conversion from int to string
-               TCHAR num = TEXT('1')+static_cast<TCHAR>(index);
-               title += num;
+       ~impl() {}
+
+       enum parser_state {
+               New = 0,
+               GetSwitch,
+               GetCommand,
+               GetParameters
+       };
+       enum error_state {
+               no_error = 0,
+               command_error,
+               channel_error,
+               parameters_error,
+               unknown_error,
+               access_error
+       };
+
+       struct command_interpreter_result
+       {
+               command_interpreter_result() : error(no_error) {}
+
+               std::shared_ptr<caspar::IO::lock_container>     lock;
+               std::wstring                                                            command_name;
+               AMCPCommand::ptr_type                                           command;
+               error_state                                                                     error;
+               AMCPCommandQueue::ptr_type                                      queue;
+       };
+
+       //The paser method expects message to be complete messages with the delimiter stripped away.
+       //Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
+       void Parse(const std::wstring& message, ClientInfoPtr client)
+       {
+               CASPAR_LOG(info) << L"Received message from " << client->print() << ": " << message << L"\\r\\n";
+       
+               command_interpreter_result result;
+               if(interpret_command_string(message, result, client))
+               {
+                       if(result.lock && !result.lock->check_access(client))
+                               result.error = access_error;
+                       else
+                               result.queue->AddCommand(result.command);
+               }
                
-               commandQueues_.push_back(pChannelCommandQueue);
+               if(result.error != no_error)
+               {
+                       std::wstringstream answer;
+                       boost::to_upper(result.command_name);
+
+                       switch(result.error)
+                       {
+                       case command_error:
+                               answer << L"400 ERROR\r\n" << message << "\r\n";
+                               break;
+                       case channel_error:
+                               answer << L"401 " << result.command_name << " ERROR\r\n";
+                               break;
+                       case parameters_error:
+                               answer << L"402 " << result.command_name << " ERROR\r\n";
+                               break;
+                       case access_error:
+                               answer << L"503 " << result.command_name << " FAILED\r\n";
+                               break;
+                       default:
+                               answer << L"500 FAILED\r\n";
+                               break;
+                       }
+                       client->send(answer.str());
+               }
        }
-}
 
-AMCPProtocolStrategy::~AMCPProtocolStrategy() {
-}
+private:
+       friend class AMCPCommand;
 
-//The paser method expects message to be complete messages with the delimiter stripped away.
-//Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
-void AMCPProtocolStrategy::Parse(const std::wstring& message, ClientInfoPtr pClientInfo)
-{
-       CASPAR_LOG(info) << L"Received message from " << pClientInfo->print() << ": " << message << L"\\r\\n";
-       
-       bool bError = true;
-       MessageParserState state = New;
+       bool interpret_command_string(const std::wstring& message, command_interpreter_result& result, ClientInfoPtr client)
+       {
+               try
+               {
+                       std::vector<std::wstring> tokens;
+                       parser_state state = New;
 
-       AMCPCommand::ptr_type pCommand(InterpretCommandString(message, &state));
+                       tokenize(message, &tokens);
 
-       if(pCommand != 0) {
-               pCommand->SetClientInfo(pClientInfo);   
-               if(QueueCommand(pCommand))
-                       bError = false;
-               else
-                       state = GetChannel;
-       }
+                       //parse the message one token at the time
+                       auto end = tokens.end();
+                       auto it = tokens.begin();
+                       while(it != end && result.error == no_error)
+                       {
+                               switch(state)
+                               {
+                               case New:
+                                       if((*it)[0] == TEXT('/'))
+                                               state = GetSwitch;
+                                       else
+                                               state = GetCommand;
+                                       break;
+
+                               case GetSwitch:
+                                       //command_switch = (*it);       //we dont care for the switch anymore
+                                       state = GetCommand;
+                                       ++it;
+                                       break;
+
+                               case GetCommand:
+                                       {
+                                               result.command_name = (*it);
+                                               result.command = create_command(result.command_name, client);
+                                               if(result.command)      //the command doesn't need a channel
+                                               {
+                                                       result.queue = commandQueues_[0];
+                                                       state = GetParameters;
+                                               }
+                                               else
+                                               {
+                                                       //get channel index from next token
+                                                       int channel_index = -1;
+                                                       int layer_index = -1;
+
+                                                       ++it;
+                                                       if(it == end)
+                                                       {
+                                                               result.error = parameters_error;
+                                                               break;
+                                                       }
+
+                                                       {       //parse channel/layer token
+                                                               try
+                                                               {
+                                                                       std::wstring channelid_str = boost::trim_copy(*it);
+                                                                       std::vector<std::wstring> split;
+                                                                       boost::split(split, channelid_str, boost::is_any_of("-"));
+
+                                                                       channel_index = boost::lexical_cast<int>(split[0]) - 1;
+                                                                       if(split.size() > 1)
+                                                                               layer_index = boost::lexical_cast<int>(split[1]);
+                                                               }
+                                                               catch(...)
+                                                               {
+                                                                       result.error = channel_error;
+                                                                       break;
+                                                               }
+                                                       }
+                                               
+                                                       if(channel_index >= 0 && channel_index < channels_.size())
+                                                       {
+                                                               result.command = create_channel_command(result.command_name, client, channels_.at(channel_index), channel_index, layer_index);
+                                                               if(result.command)
+                                                               {
+                                                                       result.lock = channels_.at(channel_index).lock;
+                                                                       result.queue = commandQueues_[channel_index + 1];
+                                                               }
+                                                               else
+                                                               {
+                                                                       result.error = command_error;
+                                                                       break;
+                                                               }
+                                                       }
+                                                       else
+                                                       {
+                                                               result.error = channel_error;
+                                                               break;
+                                                       }
+                                               }
+
+                                               state = GetParameters;
+                                               ++it;
+                                       }
+                                       break;
+
+                               case GetParameters:
+                                       {
+                                               int parameterCount=0;
+                                               while(it != end)
+                                               {
+                                                       result.command->parameters().push_back((*it));
+                                                       ++it;
+                                                       ++parameterCount;
+                                               }
+                                       }
+                                       break;
+                               }
+                       }
 
-       if(bError == true) {
-               std::wstringstream answer;
-               switch(state)
+                       if(result.command && result.error == no_error && result.command->parameters().size() < result.command->minimum_parameters()) {
+                               result.error = parameters_error;
+                       }
+               }
+               catch(...)
                {
-               case GetCommand:
-                       answer << TEXT("400 ERROR\r\n") + message << "\r\n";
-                       break;
-               case GetChannel:
-                       answer << TEXT("401 ERROR\r\n");
-                       break;
-               case GetParameters:
-                       answer << TEXT("402 ERROR\r\n");
-                       break;
-               default:
-                       answer << TEXT("500 FAILED\r\n");
-                       break;
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+                       result.error = unknown_error;
                }
-               pClientInfo->Send(answer.str());
+
+               return result.error == no_error;
        }
-}
 
-AMCPCommand::ptr_type AMCPProtocolStrategy::InterpretCommandString(const std::wstring& message, MessageParserState* pOutState)
-{
-       std::vector<std::wstring> tokens;
-       unsigned int currentToken = 0;
-       std::wstring commandSwitch;
+       std::size_t tokenize(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
+       {
+               //split on whitespace but keep strings within quotationmarks
+               //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
 
-       AMCPCommand::ptr_type pCommand;
-       MessageParserState state = New;
+               std::wstring currentToken;
 
-       std::size_t tokensInMessage = TokenizeMessage(message, &tokens);
+               char inQuote = 0;
+               bool getSpecialCode = false;
 
-       //parse the message one token at the time
-       while(currentToken < tokensInMessage)
-       {
-               switch(state)
+               for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
                {
-               case New:
-                       if(tokens[currentToken][0] == TEXT('/'))
-                               state = GetSwitch;
-                       else
-                               state = GetCommand;
-                       break;
-
-               case GetSwitch:
-                       commandSwitch = tokens[currentToken];
-                       state = GetCommand;
-                       ++currentToken;
-                       break;
-
-               case GetCommand:
-                       pCommand = CommandFactory(tokens[currentToken]);
-                       if(pCommand == 0) {
-                               goto ParseFinnished;
-                       }
-                       else
+                       if(getSpecialCode)
                        {
-                               pCommand->SetChannels(channels_);
-                               //Set scheduling
-                               if(commandSwitch.size() > 0) {
-                                       transform(commandSwitch.begin(), commandSwitch.end(), commandSwitch.begin(), toupper);
-
-                                       //if(commandSwitch == TEXT("/APP"))
-                                       //      pCommand->SetScheduling(AddToQueue);
-                                       //else if(commandSwitch  == TEXT("/IMMF"))
-                                       //      pCommand->SetScheduling(ImmediatelyAndClear);
-                               }
+                               //insert code-handling here
+                               switch(message[charIndex])
+                               {
+                               case TEXT('\\'):
+                                       currentToken += TEXT("\\");
+                                       break;
+                               case TEXT('\"'):
+                                       currentToken += TEXT("\"");
+                                       break;
+                               case TEXT('n'):
+                                       currentToken += TEXT("\n");
+                                       break;
+                               default:
+                                       break;
+                               };
+                               getSpecialCode = false;
+                               continue;
+                       }
 
-                               if(pCommand->NeedChannel())
-                                       state = GetChannel;
-                               else
-                                       state = GetParameters;
+                       if(message[charIndex]==TEXT('\\'))
+                       {
+                               getSpecialCode = true;
+                               continue;
                        }
-                       ++currentToken;
-                       break;
 
-               case GetParameters:
+                       if(message[charIndex]==' ' && inQuote==false)
                        {
-                               _ASSERTE(pCommand != 0);
-                               int parameterCount=0;
-                               while(currentToken<tokensInMessage)
+                               if(currentToken.size()>0)
                                {
-                                       pCommand->AddParameter(tokens[currentToken++]);
-                                       ++parameterCount;
-                               }
-
-                               if(parameterCount < pCommand->GetMinimumParameters()) {
-                                       goto ParseFinnished;
+                                       pTokenVector->push_back(currentToken);
+                                       currentToken.clear();
                                }
-
-                               state = Done;
-                               break;
+                               continue;
                        }
 
-               case GetChannel:
+                       if(message[charIndex]==TEXT('\"'))
                        {
-//                             assert(pCommand != 0);
-
-                               std::wstring str = boost::trim_copy(tokens[currentToken]);
-                               std::vector<std::wstring> split;
-                               boost::split(split, str, boost::is_any_of("-"));
-                                       
-                               int channelIndex = -1;
-                               int layerIndex = -1;
-                               try
-                               {
-                                       channelIndex = boost::lexical_cast<int>(split[0]) - 1;
+                               inQuote ^= 1;
 
-                                       if(split.size() > 1)
-                                               layerIndex = boost::lexical_cast<int>(split[1]);
-                               }
-                               catch(...)
+                               if(currentToken.size()>0 || !inQuote)
                                {
-                                       goto ParseFinnished;
-                               }
-
-                               std::shared_ptr<core::video_channel> pChannel = GetChannelSafe(channelIndex, channels_);
-                               if(pChannel == 0) {
-                                       goto ParseFinnished;
+                                       pTokenVector->push_back(currentToken);
+                                       currentToken.clear();
                                }
-
-                               pCommand->SetChannel(pChannel);
-                               pCommand->SetChannels(channels_);
-                               pCommand->SetChannelIndex(channelIndex);
-                               pCommand->SetLayerIntex(layerIndex);
-
-                               state = GetParameters;
-                               ++currentToken;
-                               break;
+                               continue;
                        }
 
-               default:        //Done and unexpected
-                       goto ParseFinnished;
+                       currentToken += message[charIndex];
                }
-       }
-
-ParseFinnished:
-       if(state == GetParameters && pCommand->GetMinimumParameters()==0)
-               state = Done;
-
-       if(state != Done) {
-               pCommand.reset();
-       }
-
-       if(pOutState != 0) {
-               *pOutState = state;
-       }
 
-       return pCommand;
-}
-
-bool AMCPProtocolStrategy::QueueCommand(AMCPCommand::ptr_type pCommand) {
-       if(pCommand->NeedChannel()) {
-               unsigned int channelIndex = pCommand->GetChannelIndex() + 1;
-               if(commandQueues_.size() > channelIndex) {
-                       commandQueues_[channelIndex]->AddCommand(pCommand);
-               }
-               else
-                       return false;
-       }
-       else {
-               commandQueues_[0]->AddCommand(pCommand);
-       }
-       return true;
-}
-
-AMCPCommand::ptr_type AMCPProtocolStrategy::CommandFactory(const std::wstring& str)
-{
-       std::wstring s = str;
-       transform(s.begin(), s.end(), s.begin(), toupper);
-       
-       if         (s == TEXT("MIXER"))                 return std::make_shared<MixerCommand>();
-       else if(s == TEXT("DIAG"))                      return std::make_shared<DiagnosticsCommand>();
-       else if(s == TEXT("CHANNEL_GRID"))      return std::make_shared<ChannelGridCommand>();
-       else if(s == TEXT("CALL"))                      return std::make_shared<CallCommand>();
-       else if(s == TEXT("SWAP"))                      return std::make_shared<SwapCommand>();
-       else if(s == TEXT("LOAD"))                      return std::make_shared<LoadCommand>();
-       else if(s == TEXT("LOADBG"))            return std::make_shared<LoadbgCommand>();
-       else if(s == TEXT("ADD"))                       return std::make_shared<AddCommand>();
-       else if(s == TEXT("REMOVE"))            return std::make_shared<RemoveCommand>();
-       else if(s == TEXT("PAUSE"))                     return std::make_shared<PauseCommand>();
-       else if(s == TEXT("PLAY"))                      return std::make_shared<PlayCommand>();
-       else if(s == TEXT("STOP"))                      return std::make_shared<StopCommand>();
-       else if(s == TEXT("CLEAR"))                     return std::make_shared<ClearCommand>();
-       else if(s == TEXT("PRINT"))                     return std::make_shared<PrintCommand>();
-       else if(s == TEXT("LOG"))                       return std::make_shared<LogCommand>();
-       else if(s == TEXT("CG"))                        return std::make_shared<CGCommand>();
-       else if(s == TEXT("DATA"))                      return std::make_shared<DataCommand>();
-       else if(s == TEXT("CINF"))                      return std::make_shared<CinfCommand>();
-       else if(s == TEXT("INFO"))                      return std::make_shared<InfoCommand>(channels_);
-       else if(s == TEXT("CLS"))                       return std::make_shared<ClsCommand>();
-       else if(s == TEXT("TLS"))                       return std::make_shared<TlsCommand>();
-       else if(s == TEXT("VERSION"))           return std::make_shared<VersionCommand>();
-       else if(s == TEXT("BYE"))                       return std::make_shared<ByeCommand>();
-       else if(s == TEXT("SET"))                       return std::make_shared<SetCommand>();
-       //else if(s == TEXT("MONITOR"))
-       //{
-       //      result = AMCPCommandPtr(new MonitorCommand());
-       //}
-       //else if(s == TEXT("KILL"))
-       //{
-       //      result = AMCPCommandPtr(new KillCommand());
-       //}
-       return nullptr;
-}
-
-std::size_t AMCPProtocolStrategy::TokenizeMessage(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
-{
-       //split on whitespace but keep strings within quotationmarks
-       //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
-
-       std::wstring currentToken;
-
-       char inQuote = 0;
-       bool getSpecialCode = false;
-
-       for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
-       {
-               if(getSpecialCode)
+               if(currentToken.size()>0)
                {
-                       //insert code-handling here
-                       switch(message[charIndex])
-                       {
-                       case TEXT('\\'):
-                               currentToken += TEXT("\\");
-                               break;
-                       case TEXT('\"'):
-                               currentToken += TEXT("\"");
-                               break;
-                       case TEXT('n'):
-                               currentToken += TEXT("\n");
-                               break;
-                       default:
-                               break;
-                       };
-                       getSpecialCode = false;
-                       continue;
-               }
-
-               if(message[charIndex]==TEXT('\\'))
-               {
-                       getSpecialCode = true;
-                       continue;
-               }
-
-               if(message[charIndex]==' ' && inQuote==false)
-               {
-                       if(currentToken.size()>0)
-                       {
-                               pTokenVector->push_back(currentToken);
-                               currentToken.clear();
-                       }
-                       continue;
+                       pTokenVector->push_back(currentToken);
+                       currentToken.clear();
                }
 
-               if(message[charIndex]==TEXT('\"'))
-               {
-                       inQuote ^= 1;
-
-                       if(currentToken.size()>0)
-                       {
-                               pTokenVector->push_back(currentToken);
-                               currentToken.clear();
-                       }
-                       continue;
-               }
+               return pTokenVector->size();
+       }
 
-               currentToken += message[charIndex];
+       AMCPCommand::ptr_type create_command(const std::wstring& str, ClientInfoPtr client)
+       {
+               std::wstring s = boost::to_upper_copy(str);
+               if(s == TEXT("DIAG"))                           return std::make_shared<DiagnosticsCommand>(client);
+               else if(s == TEXT("CHANNEL_GRID"))      return std::make_shared<ChannelGridCommand>(client);
+               else if(s == TEXT("DATA"))                      return std::make_shared<DataCommand>(client);
+               else if(s == TEXT("CINF"))                      return std::make_shared<CinfCommand>(client);
+               else if(s == TEXT("INFO"))                      return std::make_shared<InfoCommand>(client, channels_);
+               else if(s == TEXT("CLS"))                       return std::make_shared<ClsCommand>(client);
+               else if(s == TEXT("TLS"))                       return std::make_shared<TlsCommand>(client);
+               else if(s == TEXT("VERSION"))           return std::make_shared<VersionCommand>(client);
+               else if(s == TEXT("BYE"))                       return std::make_shared<ByeCommand>(client);
+               else if(s == TEXT("LOCK"))                      return std::make_shared<LockCommand>(client, channels_);
+               else if(s == TEXT("LOG"))                       return std::make_shared<LogCommand>(client);
+               //else if(s == TEXT("KILL"))
+               //{
+               //      result = AMCPCommandPtr(new KillCommand());
+               //}
+
+               return nullptr;
        }
 
-       if(currentToken.size()>0)
+       AMCPCommand::ptr_type create_channel_command(const std::wstring& str, ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index)
        {
-               pTokenVector->push_back(currentToken);
-               currentToken.clear();
+               std::wstring s = boost::to_upper_copy(str);
+       
+               if         (s == TEXT("MIXER"))                 return std::make_shared<MixerCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("CALL"))                      return std::make_shared<CallCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("SWAP"))                      return std::make_shared<SwapCommand>(client, channel, channel_index, layer_index, channels_);
+               else if(s == TEXT("LOAD"))                      return std::make_shared<LoadCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("LOADBG"))            return std::make_shared<LoadbgCommand>(client, channel, channel_index, layer_index, channels_);
+               else if(s == TEXT("ADD"))                       return std::make_shared<AddCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("REMOVE"))            return std::make_shared<RemoveCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("PAUSE"))                     return std::make_shared<PauseCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("PLAY"))                      return std::make_shared<PlayCommand>(client, channel, channel_index, layer_index, channels_);
+               else if(s == TEXT("STOP"))                      return std::make_shared<StopCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("CLEAR"))                     return std::make_shared<ClearCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("PRINT"))                     return std::make_shared<PrintCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("CG"))                        return std::make_shared<CGCommand>(client, channel, channel_index, layer_index);
+               else if(s == TEXT("SET"))                       return std::make_shared<SetCommand>(client, channel, channel_index, layer_index);
+
+               return nullptr;
        }
+};
+
+
+AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : impl_(spl::make_unique<impl>(channels)) {}
+AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
+void AMCPProtocolStrategy::Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo) { impl_->Parse(msg, pClientInfo); }
 
-       return pTokenVector->size();
-}
 
 }      //namespace amcp
 }}     //namespace caspar
\ No newline at end of file
index 524314d4e5e05b02b16519e600000a78bf41ef6d..cdcf1acd18361c99f175f70b338d2bdaf1fe398d 100644 (file)
 #pragma once
 
 #include "../util/protocolstrategy.h"
+
 #include <core/video_channel.h>
 
-#include "AMCPCommand.h"
-#include "AMCPCommandQueue.h"
+#include <common/memory.h>
 
 #include <boost/noncopyable.hpp>
 
@@ -35,37 +35,16 @@ namespace caspar { namespace protocol { namespace amcp {
 
 class AMCPProtocolStrategy : public IO::IProtocolStrategy, boost::noncopyable
 {
-       enum MessageParserState {
-               New = 0,
-               GetSwitch,
-               GetCommand,
-               GetParameters,
-               GetChannel,
-               Done
-       };
-
-       AMCPProtocolStrategy(const AMCPProtocolStrategy&);
-       AMCPProtocolStrategy& operator=(const AMCPProtocolStrategy&);
-
 public:
        AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels);
        virtual ~AMCPProtocolStrategy();
 
        virtual void Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo);
        virtual std::string GetCodepage() {     return "UTF-8"; }
-       AMCPCommand::ptr_type InterpretCommandString(const std::wstring& str, MessageParserState* pOutState=0);
 
 private:
-       friend class AMCPCommand;
-
-       std::size_t TokenizeMessage(const std::wstring& message, std::vector<std::wstring>* pTokenVector);
-       AMCPCommand::ptr_type CommandFactory(const std::wstring& str);
-
-       bool QueueCommand(AMCPCommand::ptr_type);
-
-       std::vector<spl::shared_ptr<core::video_channel>> channels_;
-       std::vector<AMCPCommandQueuePtr> commandQueues_;
-       static const std::wstring MessageDelimiter;
+       struct impl;
+       spl::unique_ptr<impl> impl_;
 };
 
 }}}
index 551cf30665a16e6e51a16517b535abf7f62370c8..5be2b7a954b128219b9392123d0bb534414126f4 100644 (file)
@@ -42,8 +42,8 @@ namespace caspar { namespace protocol { namespace cii {
        
 using namespace core;
 
-const std::wstring CIIProtocolStrategy::MessageDelimiter = TEXT("\r\n");
-const TCHAR CIIProtocolStrategy::TokenDelimiter = TEXT('\\');
+const std::wstring CIIProtocolStrategy::MessageDelimiter = L"\r\n";
+const wchar_t CIIProtocolStrategy::TokenDelimiter = L'\\';
 
 CIIProtocolStrategy::CIIProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : pChannel_(channels.at(0)), executor_(L"CIIProtocolStrategy")
 {
@@ -51,13 +51,12 @@ CIIProtocolStrategy::CIIProtocolStrategy(const std::vector<spl::shared_ptr<core:
 
 //The paser method expects message to be complete messages with the delimiter stripped away.
 //Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
-void CIIProtocolStrategy::Parse(const std::wstring& message, IO::ClientInfoPtr pClientInfo
+void CIIProtocolStrategy::Parse(const std::wstring& message, IO::ClientInfoPtr client
 {
        if(message.length() > 0)
        {
-               ProcessMessage(message, pClientInfo);
-               if(pClientInfo != 0)
-                       pClientInfo->Send(TEXT("*\r\n"));
+               ProcessMessage(message, client);
+               client->send(std::wstring(L"*\r\n"));
        }
 }
 
index 04df30b77e2188c89b19a66661de7e5d912050ae..c44ddfda39be4ae3f48c4ea4839f4a9e6057a821 100644 (file)
@@ -54,7 +54,7 @@ public:
 public:
        struct TitleHolder
        {
-               TitleHolder() : titleName(TEXT("")), pframe_producer(core::frame_producer::empty())     {}
+               TitleHolder() : titleName(L""), pframe_producer(core::frame_producer::empty())  {}
                TitleHolder(const std::wstring& name, spl::shared_ptr<core::frame_producer> pFP) : titleName(name), pframe_producer(pFP) {}
                TitleHolder(const TitleHolder& th) : titleName(th.titleName), pframe_producer(th.pframe_producer) {}
                const TitleHolder& operator=(const TitleHolder& th) 
@@ -78,7 +78,7 @@ private:
        spl::shared_ptr<core::frame_producer> GetPreparedTemplate(const std::wstring& name);
        void PutPreparedTemplate(const std::wstring& name, spl::shared_ptr<core::frame_producer>& pframe_producer);
 
-       static const TCHAR TokenDelimiter;
+       static const wchar_t TokenDelimiter;
        static const std::wstring MessageDelimiter;
 
        void ProcessMessage(const std::wstring& message, IO::ClientInfoPtr pClientInfo);
index 7d8e12a48da867dd86b98cf03fe38c6ae1f9d16b..18486013714f3947949b674b2441b63d1f7ed89e 100644 (file)
@@ -32,6 +32,7 @@
     <ClInclude Include="amcp\AMCPCommandQueue.h" />\r
     <ClInclude Include="amcp\AMCPCommandsImpl.h" />\r
     <ClInclude Include="amcp\AMCPProtocolStrategy.h" />\r
+    <ClInclude Include="amcp\amcp_shared.h" />\r
     <ClInclude Include="cii\CIICommand.h" />\r
     <ClInclude Include="cii\CIICommandsImpl.h" />\r
     <ClInclude Include="cii\CIIProtocolStrategy.h" />\r
index 168393d613a97adb451f71b146d6022b29237cf6..5ce3daa2273ab1ee0098b0f4200ca5b9e51e798a 100644 (file)
     <ClInclude Include="util\lock_container.h">\r
       <Filter>source\util</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="amcp\amcp_shared.h">\r
+      <Filter>source\amcp</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
   <ItemGroup>\r
     <ClCompile Include="amcp\AMCPCommandQueue.cpp">\r
index 82693a70ccd83278a1a0310a8867ff0db930e716..614593f7c160929fa3c64775573ca0a64da3658a 100644 (file)
@@ -45,14 +45,15 @@ typedef std::set<spl::shared_ptr<connection>> connection_set;
 
 class connection : public spl::enable_shared_from_this<connection>
 {   
-    const spl::shared_ptr<tcp::socket>                 socket_; 
-       const spl::shared_ptr<connection_set>           connection_set_;
-       const std::wstring                                                      name_;
-       protocol_strategy_factory<char>::ptr            protocol_factory_;
-       std::shared_ptr<protocol_strategy<char>>        protocol_;
+    const spl::shared_ptr<tcp::socket>                         socket_; 
+       boost::asio::io_service&                                                service_;
+       const spl::shared_ptr<connection_set>                   connection_set_;
+       const std::wstring                                                              name_;
+       protocol_strategy_factory<char>::ptr                    protocol_factory_;
+       std::shared_ptr<protocol_strategy<char>>                protocol_;
 
-       std::array<char, 32768>                                         data_;
-       std::vector<std::shared_ptr<void>>                      lifecycle_bound_items_;
+       std::array<char, 32768>                                                 data_;
+       std::map<std::wstring, std::shared_ptr<void>>   lifecycle_bound_objects_;
 
        class connection_holder : public client_connection<char>
        {
@@ -63,6 +64,7 @@ class connection : public spl::enable_shared_from_this<connection>
 
                virtual void send(std::basic_string<char>&& data)
                {
+                       //TODO: need to implement a send-queue
                        auto conn = connection_.lock();
                        conn->send(std::move(data));
                }
@@ -77,10 +79,15 @@ class connection : public spl::enable_shared_from_this<connection>
                        return conn->print();
                }
 
-               virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
+               virtual void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
                {
                        auto conn = connection_.lock();
-                       return conn->bind_to_lifecycle(lifecycle_bound);
+                       return conn->add_lifecycle_bound_object(key, lifecycle_bound);
+               }
+               virtual std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)
+               {
+                       auto conn = connection_.lock();
+                       return conn->remove_lifecycle_bound_object(key);
                }
        };
 
@@ -101,13 +108,6 @@ public:
        {
                return L"[" + name_ + L"]";
        }
-
-       const std::string ipv4_address() const
-       {
-               return socket_->is_open() ? socket_->local_endpoint().address().to_string() : "no-address";
-       }
-       
-       /* ClientInfo */
        
        virtual void send(std::string&& data)
        {
@@ -116,15 +116,31 @@ public:
 
        virtual void disconnect()
        {
-               stop();
+               service_.dispatch([=] { stop(); });
        }
-       void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)
+
+       void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void> lifecycle_bound)
        {
-               lifecycle_bound_items_.push_back(lifecycle_bound);
+               //TODO: needs protection from evil concurrent access
+               //tbb::concurrent_hash_map ?
+               lifecycle_bound_objects_.insert(std::pair<std::wstring, std::shared_ptr<void>>(key, lifecycle_bound));
+       }
+       std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)
+       {
+               //TODO: needs protection from evil concurrent access
+               //tbb::concurrent_hash_map ?
+               auto it = lifecycle_bound_objects_.find(key);
+               if(it != lifecycle_bound_objects_.end())
+               {
+                       auto result = (*it).second;
+                       lifecycle_bound_objects_.erase(it);
+                       return result;
+               }
+               return std::shared_ptr<void>();
        }
 
        /**************/
-       
+private:
        void stop()
        {
                connection_set_->erase(shared_from_this());
@@ -140,9 +156,14 @@ public:
                CASPAR_LOG(info) << print() << L" Disconnected.";
        }
 
-private:
+       const std::string ipv4_address() const
+       {
+               return socket_->is_open() ? socket_->local_endpoint().address().to_string() : "no-address";
+       }
+
     connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
                : socket_(socket)
+               , service_(socket->get_io_service())
                , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
                , connection_set_(connection_set)
                , protocol_factory_(protocol_factory)
@@ -186,7 +207,7 @@ private:
                if(!error)                      
                        CASPAR_LOG(trace) << print() << L" Sent: " << (data->size() < 512 ? u16(*data) : L"more than 512 bytes.");              
                else if (error != boost::asio::error::operation_aborted)                
-                       stop();         
+                       stop();
     }
 
        void read_some()
@@ -199,6 +220,8 @@ private:
                auto str = spl::make_shared<std::string>(std::move(data));
                socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2));
        }
+
+       friend struct AsyncEventServer::implementation;
 };
 
 struct AsyncEventServer::implementation
@@ -256,23 +279,18 @@ struct AsyncEventServer::implementation
                        auto conn = connection::create(socket, protocol_factory_, connection_set_);
                        connection_set_->insert(conn);
 
+                       BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)
                        {
-                               tbb::mutex::scoped_lock lock(mutex_);
-
-                               BOOST_FOREACH(auto& lifecycle_factory, lifecycle_factories_)
-                               {
-                                       auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
-                                       conn->bind_to_lifecycle(lifecycle_bound);
-                               }
+                               auto lifecycle_bound = lifecycle_factory(conn->ipv4_address());
+                               conn->add_lifecycle_bound_object(lifecycle_bound.first, lifecycle_bound.second);
                        }
                }
                start_accept();
     }
 
-       void add_client_lifecycle_event_factory(const lifecycle_factory_t& factory)
+       void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
        {
-               tbb::mutex::scoped_lock lock(mutex_);
-               lifecycle_factories_.push_back(factory);
+               service_.post([=]{ lifecycle_factories_.push_back(factory); });
        }
 };
 
@@ -281,6 +299,6 @@ AsyncEventServer::AsyncEventServer(
        : impl_(new implementation(protocol, port)) {}
 
 AsyncEventServer::~AsyncEventServer() {}
-void AsyncEventServer::add_client_lifecycle_event_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_event_factory(factory); }
+void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
 
 }}
\ No newline at end of file
index 85f28cc4405868402fa41287933a416ba982da72..b01394dead72f70163e047d7777bee7e406cb4eb 100644 (file)
@@ -26,7 +26,7 @@
 
 namespace caspar { namespace IO {
 
-       typedef std::function<std::shared_ptr<void> (const std::string& ipv4_address)>
+       typedef std::function<std::pair<std::wstring, std::shared_ptr<void>> (const std::string& ipv4_address)>
                lifecycle_factory_t;
 
 class AsyncEventServer
@@ -35,10 +35,10 @@ public:
        explicit AsyncEventServer(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port);
        ~AsyncEventServer();
 
-       void add_client_lifecycle_event_factory(const lifecycle_factory_t& lifecycle_factory);
+       void add_client_lifecycle_object_factory(const lifecycle_factory_t& lifecycle_factory);
 
-private:
        struct implementation;
+private:
        std::unique_ptr<implementation> impl_;
 };
 
index 6d40f601e20b70117d5e3bfe097190b63e683d0e..1810da156bb094a38e6a86da13cc1f10bef54df2 100644 (file)
 #include <iostream>
 
 #include <common/log.h>
+#include "protocol_strategy.h"
 
 namespace caspar { namespace IO {
-
+/*
 class ClientInfo 
 {
 protected:
@@ -40,20 +41,22 @@ public:
        virtual void Send(const std::wstring& data) = 0;
        virtual void Disconnect() = 0;
        virtual std::wstring print() const = 0;
-       virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound) = 0;
+       virtual void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound) = 0;
+       virtual std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key) = 0;
 };
+*/
+typedef spl::shared_ptr<client_connection<wchar_t>> ClientInfoPtr;
 
-typedef std::shared_ptr<ClientInfo> ClientInfoPtr;
-
-struct ConsoleClientInfo : public caspar::IO::ClientInfo 
+struct ConsoleClientInfo : public client_connection<wchar_t>
 {
-       void Send(const std::wstring& data)
+       virtual void send(std::wstring&& data)
        {
                std::wcout << (L"#" + caspar::log::replace_nonprintable_copy(data, L'?'));
        }
-       void Disconnect(){}
+       virtual void disconnect() {}
        virtual std::wstring print() const {return L"Console";}
-       virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound) {}
+       virtual void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound) {}
+       virtual std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key) { return std::shared_ptr<void>(); }
 };
 
 }}
index adbac7db24088074cd6410b8d3ea144739019f67..9b796d8b6e4b935978e72acfd4a02b241a436fe4 100644 (file)
@@ -33,8 +33,6 @@ public:
 
        virtual void Parse(const std::wstring& msg, ClientInfoPtr pClientInfo) = 0;
        virtual std::string GetCodepage() = 0;
-
-       virtual void on_client_disconnect(IO::ClientInfoPtr pClientInfo) {}
 };
 typedef std::shared_ptr<IProtocolStrategy> ProtocolStrategyPtr;
 
index 357d851daec54dfe844d725e5416a2de353b8087..422e8db961c224fee01547cfeef15b044cf5b3c9 100644 (file)
@@ -66,7 +66,8 @@ public:
        virtual void disconnect() = 0;\r
        virtual std::wstring print() const = 0;\r
 \r
-       virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound) = 0;\r
+       virtual void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound) = 0;\r
+       virtual std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key) = 0;\r
 };\r
 \r
 /**\r
index 1aa20c918167e85711024b4930a1a19cdf89f88f..70b637f2e007db99c00be89fbac416af8247d90c 100644 (file)
@@ -82,9 +82,14 @@ public:
                return client_->print();\r
        }\r
 \r
-       virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)\r
+\r
+       void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)\r
+       {\r
+               client_->add_lifecycle_bound_object(key, lifecycle_bound);\r
+       }\r
+       std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)\r
        {\r
-               client_->bind_to_lifecycle(lifecycle_bound);\r
+               return client_->remove_lifecycle_bound_object(key);\r
        }\r
 };\r
 \r
@@ -104,7 +109,7 @@ protocol_strategy<char>::ptr to_unicode_adapter_factory::create(
        return spl::make_shared<to_unicode_adapter>(codepage_, unicode_strategy_factory_->create(client));\r
 }\r
 \r
-class legacy_client_info : public ClientInfo\r
+/*class legacy_client_info : public ClientInfo\r
 {\r
        client_connection<wchar_t>::ptr client_connection_;\r
 public:\r
@@ -134,12 +139,16 @@ public:
        {\r
                return client_connection_->print();\r
        }\r
-\r
-       virtual void bind_to_lifecycle(const std::shared_ptr<void>& lifecycle_bound)\r
+       virtual void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)\r
        {\r
-               client_connection_->bind_to_lifecycle(lifecycle_bound);\r
+               client_connection_->add_lifecycle_bound_object(key, lifecycle_bound);\r
        }\r
-};\r
+       virtual std::shared_ptr<void> remove_lifecycle_bound_object(const std::wstring& key)\r
+       {\r
+               return client_connection_->remove_lifecycle_bound_object(key);\r
+       }\r
+\r
+};*/\r
 \r
 class legacy_strategy_adapter : public protocol_strategy<wchar_t>\r
 {\r
@@ -150,7 +159,7 @@ public:
                        const ProtocolStrategyPtr& strategy, \r
                        const client_connection<wchar_t>::ptr& client_connection)\r
                : strategy_(strategy)\r
-               , client_info_(std::make_shared<legacy_client_info>(client_connection))\r
+               , client_info_(client_connection)\r
        {\r
                CASPAR_LOG(info) << "legacy_strategy_adapter created.";\r
        }\r
index 02cb721c24b32c322ac0bcb5ed204be71c1fb95f..8e6afd9365af02f1a7fcfc83353e11f166f3f23d 100644 (file)
@@ -22,6 +22,7 @@
 #pragma once\r
 \r
 #include <boost/algorithm/string/split.hpp>\r
+#include <boost/foreach.hpp>\r
 \r
 #include "protocol_strategy.h"\r
 #include "ProtocolStrategy.h"\r
@@ -71,7 +72,7 @@ public:
        {\r
        }\r
 \r
-       virtual void parse(const std::basic_string<char>& data)\r
+       virtual void parse(const std::basic_string<CharT>& data)\r
        {\r
                input_ += data;\r
 \r
index fad67a490c81603b095acb3aad738c1903c06828..d0383409b5dcf5a2b7eadc09acc25925cb6944c5 100644 (file)
@@ -42,6 +42,7 @@
 #include <mmsystem.h>
 #include <atlbase.h>
 
+#include <protocol/util/strategy_adapters.h>
 #include <protocol/amcp/AMCPProtocolStrategy.h>
 #include <protocol/osc/server.h>
 
@@ -66,6 +67,7 @@
 #include <boost/property_tree/xml_parser.hpp>
 #include <boost/foreach.hpp>
 #include <boost/locale.hpp>
+#include <boost/algorithm/string/predicate.hpp>
 
 #include <signal.h>
 
@@ -194,12 +196,13 @@ void run()
 
        //caspar_server.subscribe(console_obs);
                                                
+       // Create a dummy client which prints amcp responses to console.
+       auto console_client = spl::make_shared<IO::ConsoleClientInfo>();
+       
        // Create a amcp parser for console commands.
-       protocol::amcp::AMCPProtocolStrategy amcp(caspar_server.channels());
+       //protocol::amcp::AMCPProtocolStrategy amcp(caspar_server.channels());
+       auto amcp = spl::make_shared<caspar::IO::delimiter_based_chunking_strategy_factory<wchar_t>>(L"\r\n", spl::make_shared<caspar::IO::legacy_strategy_adapter_factory>(spl::make_shared<protocol::amcp::AMCPProtocolStrategy>(caspar_server.channels())))->create(console_client);
 
-       // Create a dummy client which prints amcp responses to console.
-       auto console_client = std::make_shared<IO::ConsoleClientInfo>();
-                       
        std::wstring wcmd;
        while(true)
        {
@@ -284,7 +287,7 @@ void run()
                }
 
                wcmd += L"\r\n";
-               amcp.Parse(wcmd, console_client);
+               amcp->parse(wcmd);
        }       
        CASPAR_LOG(info) << "Successfully shutdown CasparCG Server.";
 }
index 8a33080e7ce423ead16184c1aa3b066523341d0e..3b3e9b50c346dcd0236cfe43654d1714bbfd47ea 100644 (file)
@@ -59,6 +59,7 @@
 #include <protocol/util/strategy_adapters.h>
 
 #include <boost/algorithm/string.hpp>
+#include <boost/thread.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/foreach.hpp>
 #include <boost/property_tree/ptree.hpp>
@@ -121,7 +122,8 @@ struct server::impl : boost::noncopyable
                async_servers_.clear();
                channels_.clear();
 
-               Sleep(500); // HACK: Wait for asynchronous destruction of producers and consumers.
+               boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+               //Sleep(500); // HACK: Wait for asynchronous destruction of producers and consumers.
 
                image::uninit();
                ffmpeg::uninit();
@@ -188,9 +190,9 @@ struct server::impl : boost::noncopyable
                                        async_servers_.push_back(asyncbootstrapper);
 
                                        //TODO: remove - test
-                                       asyncbootstrapper->add_client_lifecycle_event_factory([=] (const std::string& ipv4_address) {
-                                                                                                                                                                       return std::shared_ptr<void>(nullptr, [] (void*) 
-                                                                                                                                                                       { CASPAR_LOG(info) << "Client disconnect (lifecycle)"; });
+                                       asyncbootstrapper->add_client_lifecycle_object_factory([=] (const std::string& ipv4_address) {
+                                                                                                                                                                       return std::pair<std::wstring, std::shared_ptr<void>>(L"log", std::shared_ptr<void>(nullptr, [] (void*) 
+                                                                                                                                                                       { CASPAR_LOG(info) << "Client disconnect (lifecycle)"; }));
                                                                                                                                                                });
                                }
                                else