]> git.sesse.net Git - casparcg/commitdiff
Merge branch '2.1.0' of https://github.com/CasparCG/Server into 2.1.0
authorniklaspandersson <niklas.p.andersson@svt.se>
Mon, 12 Aug 2013 16:16:57 +0000 (18:16 +0200)
committerniklaspandersson <niklas.p.andersson@svt.se>
Tue, 13 Aug 2013 07:45:24 +0000 (09:45 +0200)
1  2 
protocol/amcp/AMCPProtocolStrategy.cpp
protocol/amcp/amcp_shared.h
protocol/util/lock_container.cpp
protocol/util/lock_container.h

index 524316820e9e972387fd10d2ea95075600437441,f18008cf089662b636b72473f2539faf592cb1fe..efa3e92b4f931ef634728a0d19b53c7a5f6267c5
@@@ -47,346 -46,365 +47,346 @@@ namespace caspar { namespace protocol 
  
  using IO::ClientInfoPtr;
  
 -const std::wstring AMCPProtocolStrategy::MessageDelimiter = TEXT("\r\n");
 -
 -inline std::shared_ptr<core::video_channel> GetChannelSafe(unsigned int index, const std::vector<spl::shared_ptr<core::video_channel>>& channels)
 +struct AMCPProtocolStrategy::impl
  {
 -      return index < channels.size() ? std::shared_ptr<core::video_channel>(channels[index]) : nullptr;
 -}
 -
 -AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : channels_(channels) {
 -      AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue());
 -      commandQueues_.push_back(pGeneralCommandQueue);
 -
 +private:
 +      std::vector<channel_context> channels_;
 +      std::vector<AMCPCommandQueue::ptr_type> commandQueues_;
  
 -      std::shared_ptr<core::video_channel> pChannel;
 -      unsigned int index = -1;
 -      //Create a commandpump for each video_channel
 -      while((pChannel = GetChannelSafe(++index, channels_)) != 0) {
 -              AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue());
 -              std::wstring title = TEXT("video_channel ");
 +public:
 +      impl(const std::vector<spl::shared_ptr<core::video_channel>>& channels)
 +      {
 +              commandQueues_.push_back(std::make_shared<AMCPCommandQueue>());
  
 -              //HACK: Perform real conversion from int to string
 -              TCHAR num = TEXT('1')+static_cast<TCHAR>(index);
 -              title += num;
 -              
 -              commandQueues_.push_back(pChannelCommandQueue);
 +              int index = 0;
 +              BOOST_FOREACH(const spl::shared_ptr<core::video_channel>& channel,  channels)
 +              {
 +                      std::wstring lifecycle_key = L"lock" + boost::lexical_cast<std::wstring>(index);
 +                      channels_.push_back(channel_context(channel, lifecycle_key));
 +                      auto queue(std::make_shared<AMCPCommandQueue>());
 +                      commandQueues_.push_back(queue);
 +                      ++index;
 +              }
        }
 -}
 -
 -AMCPProtocolStrategy::~AMCPProtocolStrategy() {
 -}
 -
 -void AMCPProtocolStrategy::Parse(const TCHAR* pData, int charCount, ClientInfoPtr pClientInfo)
 -{
 -      size_t pos;
 -      std::wstring recvData(pData, charCount);
 -      std::wstring availibleData = (pClientInfo != nullptr ? pClientInfo->currentMessage_ : L"") + recvData;
  
 -      while(true) {
 -              pos = availibleData.find(MessageDelimiter);
 -              if(pos != std::wstring::npos)
 +      ~impl() {}
 +
 +      enum parser_state {
 +              New = 0,
 +              GetSwitch,
 +              GetCommand,
 +              GetParameters
 +      };
 +      enum error_state {
 +              no_error = 0,
 +              command_error,
 +              channel_error,
 +              parameters_error,
 +              unknown_error,
 +              access_error
 +      };
 +
 +      struct command_interpreter_result
 +      {
 +              command_interpreter_result() : error(no_error) {}
 +
 +              std::shared_ptr<caspar::IO::lock_container>     lock;
 +              std::wstring                                                            command_name;
 +              AMCPCommand::ptr_type                                           command;
 +              error_state                                                                     error;
 +              AMCPCommandQueue::ptr_type                                      queue;
 +      };
 +
 +      //The paser method expects message to be complete messages with the delimiter stripped away.
 +      //Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
 +      void Parse(const std::wstring& message, ClientInfoPtr client)
 +      {
 +              CASPAR_LOG(info) << L"Received message from " << client->print() << ": " << message << L"\\r\\n";
 +      
 +              command_interpreter_result result;
 +              if(interpret_command_string(message, result, client))
                {
 -                      std::wstring message = availibleData.substr(0,pos);
 -
 -                      //This is where a complete message gets taken care of
 -                      if(message.length() > 0) {
 -                              ProcessMessage(message, pClientInfo);
 -                      }
 +                      if(result.lock && !result.lock->check_access(client))
 +                              result.error = access_error;
 +                      else
 +                              result.queue->AddCommand(result.command);
 +              }
 +              
 +              if(result.error != no_error)
 +              {
 +                      std::wstringstream answer;
 +                      boost::to_upper(result.command_name);
  
 -                      std::size_t nextStartPos = pos + MessageDelimiter.length();
 -                      if(nextStartPos < availibleData.length())
 -                              availibleData = availibleData.substr(nextStartPos);
 -                      else {
 -                              availibleData.clear();
 +                      switch(result.error)
 +                      {
 +                      case command_error:
 +                              answer << L"400 ERROR\r\n" << message << "\r\n";
 +                              break;
 +                      case channel_error:
 +                              answer << L"401 " << result.command_name << " ERROR\r\n";
 +                              break;
 +                      case parameters_error:
 +                              answer << L"402 " << result.command_name << " ERROR\r\n";
 +                              break;
 +                      case access_error:
 +                              answer << L"503 " << result.command_name << " FAILED\r\n";
 +                              break;
 +                      default:
 +                              answer << L"500 FAILED\r\n";
                                break;
                        }
 -              }
 -              else
 -              {
 -                      break;
 +                      client->send(answer.str());
                }
        }
 -      if(pClientInfo)
 -              pClientInfo->currentMessage_ = availibleData;
 -}
  
 -void AMCPProtocolStrategy::ProcessMessage(const std::wstring& message, ClientInfoPtr& pClientInfo)
 -{     
 -      CASPAR_LOG(info) << L"Received message from " << pClientInfo->print() << ": " << message << L"\\r\\n";
 -      
 -      bool bError = true;
 -      MessageParserState state = New;
 +private:
 +      friend class AMCPCommand;
  
 -      AMCPCommand::ptr_type pCommand(InterpretCommandString(message, &state));
 +      bool interpret_command_string(const std::wstring& message, command_interpreter_result& result, ClientInfoPtr client)
 +      {
 +              try
 +              {
 +                      std::vector<std::wstring> tokens;
 +                      parser_state state = New;
  
 -      if(pCommand != 0) {
 -              pCommand->SetClientInfo(pClientInfo);   
 -              if(QueueCommand(pCommand))
 -                      bError = false;
 -              else
 -                      state = GetChannel;
 -      }
 +                      tokenize(message, &tokens);
  
 -      if(bError == true) {
 -              std::wstringstream answer;
 -              switch(state)
 +                      //parse the message one token at the time
 +                      auto end = tokens.end();
 +                      auto it = tokens.begin();
 +                      while(it != end && result.error == no_error)
 +                      {
 +                              switch(state)
 +                              {
 +                              case New:
 +                                      if((*it)[0] == TEXT('/'))
 +                                              state = GetSwitch;
 +                                      else
 +                                              state = GetCommand;
 +                                      break;
 +
 +                              case GetSwitch:
 +                                      //command_switch = (*it);       //we dont care for the switch anymore
 +                                      state = GetCommand;
 +                                      ++it;
 +                                      break;
 +
 +                              case GetCommand:
 +                                      {
 +                                              result.command_name = (*it);
 +                                              result.command = create_command(result.command_name, client);
 +                                              if(result.command)      //the command doesn't need a channel
 +                                              {
 +                                                      result.queue = commandQueues_[0];
 +                                                      state = GetParameters;
 +                                              }
 +                                              else
 +                                              {
 +                                                      //get channel index from next token
 +                                                      int channel_index = -1;
 +                                                      int layer_index = -1;
 +
 +                                                      ++it;
 +                                                      if(it == end)
 +                                                      {
 +                                                              result.error = parameters_error;
 +                                                              break;
 +                                                      }
 +
 +                                                      {       //parse channel/layer token
 +                                                              try
 +                                                              {
 +                                                                      std::wstring channelid_str = boost::trim_copy(*it);
 +                                                                      std::vector<std::wstring> split;
 +                                                                      boost::split(split, channelid_str, boost::is_any_of("-"));
 +
 +                                                                      channel_index = boost::lexical_cast<int>(split[0]) - 1;
 +                                                                      if(split.size() > 1)
 +                                                                              layer_index = boost::lexical_cast<int>(split[1]);
 +                                                              }
 +                                                              catch(...)
 +                                                              {
 +                                                                      result.error = channel_error;
 +                                                                      break;
 +                                                              }
 +                                                      }
 +                                              
 +                                                      if(channel_index >= 0 && channel_index < channels_.size())
 +                                                      {
 +                                                              result.command = create_channel_command(result.command_name, client, channels_.at(channel_index), channel_index, layer_index);
 +                                                              if(result.command)
 +                                                              {
 +                                                                      result.lock = channels_.at(channel_index).lock;
 +                                                                      result.queue = commandQueues_[channel_index + 1];
 +                                                              }
 +                                                              else
 +                                                              {
 +                                                                      result.error = command_error;
 +                                                                      break;
 +                                                              }
 +                                                      }
 +                                                      else
 +                                                      {
 +                                                              result.error = channel_error;
 +                                                              break;
 +                                                      }
 +                                              }
 +
 +                                              state = GetParameters;
 +                                              ++it;
 +                                      }
 +                                      break;
 +
 +                              case GetParameters:
 +                                      {
 +                                              int parameterCount=0;
 +                                              while(it != end)
 +                                              {
 +                                                      result.command->parameters().push_back((*it));
 +                                                      ++it;
 +                                                      ++parameterCount;
 +                                              }
 +                                      }
 +                                      break;
 +                              }
 +                      }
 +
 +                      if(result.command && result.error == no_error && result.command->parameters().size() < result.command->minimum_parameters()) {
 +                              result.error = parameters_error;
 +                      }
 +              }
 +              catch(...)
                {
 -              case GetCommand:
 -                      answer << TEXT("400 ERROR\r\n") + message << "\r\n";
 -                      break;
 -              case GetChannel:
 -                      answer << TEXT("401 ERROR\r\n");
 -                      break;
 -              case GetParameters:
 -                      answer << TEXT("402 ERROR\r\n");
 -                      break;
 -              default:
 -                      answer << TEXT("500 FAILED\r\n");
 -                      break;
 +                      CASPAR_LOG_CURRENT_EXCEPTION();
 +                      result.error = unknown_error;
                }
 -              pClientInfo->Send(answer.str());
 +
 +              return result.error == no_error;
        }
 -}
  
 -AMCPCommand::ptr_type AMCPProtocolStrategy::InterpretCommandString(const std::wstring& message, MessageParserState* pOutState)
 -{
 -      std::vector<std::wstring> tokens;
 -      unsigned int currentToken = 0;
 -      std::wstring commandSwitch;
 +      std::size_t tokenize(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
 +      {
 +              //split on whitespace but keep strings within quotationmarks
 +              //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
  
 -      AMCPCommand::ptr_type pCommand;
 -      MessageParserState state = New;
 +              std::wstring currentToken;
  
-               char inQuote = 0;
 -      std::size_t tokensInMessage = TokenizeMessage(message, &tokens);
++              bool inQuote = false;
 +              bool getSpecialCode = false;
  
 -      //parse the message one token at the time
 -      while(currentToken < tokensInMessage)
 -      {
 -              switch(state)
 +              for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
                {
 -              case New:
 -                      if(tokens[currentToken][0] == TEXT('/'))
 -                              state = GetSwitch;
 -                      else
 -                              state = GetCommand;
 -                      break;
 -
 -              case GetSwitch:
 -                      commandSwitch = tokens[currentToken];
 -                      state = GetCommand;
 -                      ++currentToken;
 -                      break;
 -
 -              case GetCommand:
 -                      pCommand = CommandFactory(tokens[currentToken]);
 -                      if(pCommand == 0) {
 -                              goto ParseFinnished;
 -                      }
 -                      else
 +                      if(getSpecialCode)
                        {
 -                              pCommand->SetChannels(channels_);
 -                              //Set scheduling
 -                              if(commandSwitch.size() > 0) {
 -                                      transform(commandSwitch.begin(), commandSwitch.end(), commandSwitch.begin(), toupper);
 -
 -                                      //if(commandSwitch == TEXT("/APP"))
 -                                      //      pCommand->SetScheduling(AddToQueue);
 -                                      //else if(commandSwitch  == TEXT("/IMMF"))
 -                                      //      pCommand->SetScheduling(ImmediatelyAndClear);
 -                              }
 +                              //insert code-handling here
 +                              switch(message[charIndex])
 +                              {
 +                              case TEXT('\\'):
 +                                      currentToken += TEXT("\\");
 +                                      break;
 +                              case TEXT('\"'):
 +                                      currentToken += TEXT("\"");
 +                                      break;
 +                              case TEXT('n'):
 +                                      currentToken += TEXT("\n");
 +                                      break;
 +                              default:
 +                                      break;
 +                              };
 +                              getSpecialCode = false;
 +                              continue;
 +                      }
  
 -                              if(pCommand->NeedChannel())
 -                                      state = GetChannel;
 -                              else
 -                                      state = GetParameters;
 +                      if(message[charIndex]==TEXT('\\'))
 +                      {
 +                              getSpecialCode = true;
 +                              continue;
                        }
 -                      ++currentToken;
 -                      break;
  
 -              case GetParameters:
 +                      if(message[charIndex]==' ' && inQuote==false)
                        {
 -                              _ASSERTE(pCommand != 0);
 -                              int parameterCount=0;
 -                              while(currentToken<tokensInMessage)
 +                              if(currentToken.size()>0)
                                {
 -                                      pCommand->AddParameter(tokens[currentToken++]);
 -                                      ++parameterCount;
 -                              }
 -
 -                              if(parameterCount < pCommand->GetMinimumParameters()) {
 -                                      goto ParseFinnished;
 +                                      pTokenVector->push_back(currentToken);
 +                                      currentToken.clear();
                                }
 -
 -                              state = Done;
 -                              break;
 +                              continue;
                        }
  
 -              case GetChannel:
 +                      if(message[charIndex]==TEXT('\"'))
                        {
-                               inQuote ^= 1;
 -//                            assert(pCommand != 0);
 -
 -                              std::wstring str = boost::trim_copy(tokens[currentToken]);
 -                              std::vector<std::wstring> split;
 -                              boost::split(split, str, boost::is_any_of("-"));
 -                                      
 -                              int channelIndex = -1;
 -                              int layerIndex = -1;
 -                              try
 -                              {
 -                                      channelIndex = boost::lexical_cast<int>(split[0]) - 1;
++                              inQuote = !inQuote;
  
 -                                      if(split.size() > 1)
 -                                              layerIndex = boost::lexical_cast<int>(split[1]);
 -                              }
 -                              catch(...)
 +                              if(currentToken.size()>0 || !inQuote)
                                {
 -                                      goto ParseFinnished;
 +                                      pTokenVector->push_back(currentToken);
 +                                      currentToken.clear();
                                }
 -
 -                              std::shared_ptr<core::video_channel> pChannel = GetChannelSafe(channelIndex, channels_);
 -                              if(pChannel == 0) {
 -                                      goto ParseFinnished;
 -                              }
 -
 -                              pCommand->SetChannel(pChannel);
 -                              pCommand->SetChannels(channels_);
 -                              pCommand->SetChannelIndex(channelIndex);
 -                              pCommand->SetLayerIntex(layerIndex);
 -
 -                              state = GetParameters;
 -                              ++currentToken;
 -                              break;
 +                              continue;
                        }
  
 -              default:        //Done and unexpected
 -                      goto ParseFinnished;
 -              }
 -      }
 -
 -ParseFinnished:
 -      if(state == GetParameters && pCommand->GetMinimumParameters()==0)
 -              state = Done;
 -
 -      if(state != Done) {
 -              pCommand.reset();
 -      }
 -
 -      if(pOutState != 0) {
 -              *pOutState = state;
 -      }
 -
 -      return pCommand;
 -}
 -
 -bool AMCPProtocolStrategy::QueueCommand(AMCPCommand::ptr_type pCommand) {
 -      if(pCommand->NeedChannel()) {
 -              unsigned int channelIndex = pCommand->GetChannelIndex() + 1;
 -              if(commandQueues_.size() > channelIndex) {
 -                      commandQueues_[channelIndex]->AddCommand(pCommand);
 +                      currentToken += message[charIndex];
                }
 -              else
 -                      return false;
 -      }
 -      else {
 -              commandQueues_[0]->AddCommand(pCommand);
 -      }
 -      return true;
 -}
 -
 -AMCPCommand::ptr_type AMCPProtocolStrategy::CommandFactory(const std::wstring& str)
 -{
 -      std::wstring s = str;
 -      transform(s.begin(), s.end(), s.begin(), toupper);
 -      
 -      if         (s == TEXT("MIXER"))                 return std::make_shared<MixerCommand>();
 -      else if(s == TEXT("DIAG"))                      return std::make_shared<DiagnosticsCommand>();
 -      else if(s == TEXT("CHANNEL_GRID"))      return std::make_shared<ChannelGridCommand>();
 -      else if(s == TEXT("CALL"))                      return std::make_shared<CallCommand>();
 -      else if(s == TEXT("SWAP"))                      return std::make_shared<SwapCommand>();
 -      else if(s == TEXT("LOAD"))                      return std::make_shared<LoadCommand>();
 -      else if(s == TEXT("LOADBG"))            return std::make_shared<LoadbgCommand>();
 -      else if(s == TEXT("ADD"))                       return std::make_shared<AddCommand>();
 -      else if(s == TEXT("REMOVE"))            return std::make_shared<RemoveCommand>();
 -      else if(s == TEXT("PAUSE"))                     return std::make_shared<PauseCommand>();
 -      else if(s == TEXT("PLAY"))                      return std::make_shared<PlayCommand>();
 -      else if(s == TEXT("STOP"))                      return std::make_shared<StopCommand>();
 -      else if(s == TEXT("CLEAR"))                     return std::make_shared<ClearCommand>();
 -      else if(s == TEXT("PRINT"))                     return std::make_shared<PrintCommand>();
 -      else if(s == TEXT("LOG"))                       return std::make_shared<LogCommand>();
 -      else if(s == TEXT("CG"))                        return std::make_shared<CGCommand>();
 -      else if(s == TEXT("DATA"))                      return std::make_shared<DataCommand>();
 -      else if(s == TEXT("CINF"))                      return std::make_shared<CinfCommand>();
 -      else if(s == TEXT("INFO"))                      return std::make_shared<InfoCommand>(channels_);
 -      else if(s == TEXT("CLS"))                       return std::make_shared<ClsCommand>();
 -      else if(s == TEXT("TLS"))                       return std::make_shared<TlsCommand>();
 -      else if(s == TEXT("VERSION"))           return std::make_shared<VersionCommand>();
 -      else if(s == TEXT("BYE"))                       return std::make_shared<ByeCommand>();
 -      else if(s == TEXT("SET"))                       return std::make_shared<SetCommand>();
 -      //else if(s == TEXT("MONITOR"))
 -      //{
 -      //      result = AMCPCommandPtr(new MonitorCommand());
 -      //}
 -      //else if(s == TEXT("KILL"))
 -      //{
 -      //      result = AMCPCommandPtr(new KillCommand());
 -      //}
 -      return nullptr;
 -}
 -
 -std::size_t AMCPProtocolStrategy::TokenizeMessage(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
 -{
 -      //split on whitespace but keep strings within quotationmarks
 -      //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
 -
 -      std::wstring currentToken;
 -
 -      bool inQuote = false;
 -      bool getSpecialCode = false;
  
 -      for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
 -      {
 -              if(getSpecialCode)
 +              if(currentToken.size()>0)
                {
 -                      //insert code-handling here
 -                      switch(message[charIndex])
 -                      {
 -                      case TEXT('\\'):
 -                              currentToken += TEXT("\\");
 -                              break;
 -                      case TEXT('\"'):
 -                              currentToken += TEXT("\"");
 -                              break;
 -                      case TEXT('n'):
 -                              currentToken += TEXT("\n");
 -                              break;
 -                      default:
 -                              break;
 -                      };
 -                      getSpecialCode = false;
 -                      continue;
 +                      pTokenVector->push_back(currentToken);
 +                      currentToken.clear();
                }
  
 -              if(message[charIndex]==TEXT('\\'))
 -              {
 -                      getSpecialCode = true;
 -                      continue;
 -              }
 -
 -              if(message[charIndex]==' ' && inQuote==false)
 -              {
 -                      if(currentToken.size()>0)
 -                      {
 -                              pTokenVector->push_back(currentToken);
 -                              currentToken.clear();
 -                      }
 -                      continue;
 -              }
 -
 -              if(message[charIndex]==TEXT('\"'))
 -              {
 -                      inQuote = !inQuote;
 -
 -                      if(currentToken.size() > 0 || !inQuote)
 -                      {
 -                              pTokenVector->push_back(currentToken);
 -                              currentToken.clear();
 -                      }
 -                      continue;
 -              }
 +              return pTokenVector->size();
 +      }
  
 -              currentToken += message[charIndex];
 +      AMCPCommand::ptr_type create_command(const std::wstring& str, ClientInfoPtr client)
 +      {
 +              std::wstring s = boost::to_upper_copy(str);
 +              if(s == TEXT("DIAG"))                           return std::make_shared<DiagnosticsCommand>(client);
 +              else if(s == TEXT("CHANNEL_GRID"))      return std::make_shared<ChannelGridCommand>(client);
 +              else if(s == TEXT("DATA"))                      return std::make_shared<DataCommand>(client);
 +              else if(s == TEXT("CINF"))                      return std::make_shared<CinfCommand>(client);
 +              else if(s == TEXT("INFO"))                      return std::make_shared<InfoCommand>(client, channels_);
 +              else if(s == TEXT("CLS"))                       return std::make_shared<ClsCommand>(client);
 +              else if(s == TEXT("TLS"))                       return std::make_shared<TlsCommand>(client);
 +              else if(s == TEXT("VERSION"))           return std::make_shared<VersionCommand>(client);
 +              else if(s == TEXT("BYE"))                       return std::make_shared<ByeCommand>(client);
 +              else if(s == TEXT("LOCK"))                      return std::make_shared<LockCommand>(client, channels_);
 +              else if(s == TEXT("LOG"))                       return std::make_shared<LogCommand>(client);
 +              //else if(s == TEXT("KILL"))
 +              //{
 +              //      result = AMCPCommandPtr(new KillCommand());
 +              //}
 +
 +              return nullptr;
        }
  
 -      if(currentToken.size()>0)
 +      AMCPCommand::ptr_type create_channel_command(const std::wstring& str, ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index)
        {
 -              pTokenVector->push_back(currentToken);
 -              currentToken.clear();
 +              std::wstring s = boost::to_upper_copy(str);
 +      
 +              if         (s == TEXT("MIXER"))                 return std::make_shared<MixerCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("CALL"))                      return std::make_shared<CallCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("SWAP"))                      return std::make_shared<SwapCommand>(client, channel, channel_index, layer_index, channels_);
 +              else if(s == TEXT("LOAD"))                      return std::make_shared<LoadCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("LOADBG"))            return std::make_shared<LoadbgCommand>(client, channel, channel_index, layer_index, channels_);
 +              else if(s == TEXT("ADD"))                       return std::make_shared<AddCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("REMOVE"))            return std::make_shared<RemoveCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("PAUSE"))                     return std::make_shared<PauseCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("PLAY"))                      return std::make_shared<PlayCommand>(client, channel, channel_index, layer_index, channels_);
 +              else if(s == TEXT("STOP"))                      return std::make_shared<StopCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("CLEAR"))                     return std::make_shared<ClearCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("PRINT"))                     return std::make_shared<PrintCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("CG"))                        return std::make_shared<CGCommand>(client, channel, channel_index, layer_index);
 +              else if(s == TEXT("SET"))                       return std::make_shared<SetCommand>(client, channel, channel_index, layer_index);
 +
 +              return nullptr;
        }
 +};
 +
 +
 +AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : impl_(spl::make_unique<impl>(channels)) {}
 +AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
 +void AMCPProtocolStrategy::Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo) { impl_->Parse(msg, pClientInfo); }
  
 -      return pTokenVector->size();
 -}
  
  }     //namespace amcp
  }}    //namespace caspar
index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..8e64d4db9b7f54de0b41ba74d4021767e3c87477
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,18 @@@
++#pragma once
++
++#include "../util/lock_container.h"
++#include <core/video_channel.h>
++#include <common/memory.h>
++
++namespace caspar { namespace protocol { namespace amcp {
++
++class channel_context
++{
++      channel_context();
++public:
++      explicit channel_context(const spl::shared_ptr<core::video_channel>& c, const std::wstring& lifecycle_key) : channel(c), lock(spl::make_shared<caspar::IO::lock_container>(lifecycle_key)) {}
++      spl::shared_ptr<core::video_channel>            channel;
++      caspar::IO::lock_container::ptr_type            lock;
++};
++
++}}}
index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..14ee3d7f3d1428b4645a5e9bb5baf9a4dbaf8470
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,77 @@@
++#include "..\StdAfx.h"
++
++#include <tbb/spin_rw_mutex.h>
++#include "lock_container.h"
++
++namespace caspar { namespace IO {
++
++      struct lock_container::impl
++      {
++              std::wstring                                                                            lifecycle_key_;
++      private:
++              std::set<std::weak_ptr<client_connection<wchar_t>>, std::owner_less<std::weak_ptr<client_connection<wchar_t>>>> locks_;
++              std::wstring                                                                            lock_phrase_;
++              mutable tbb::spin_rw_mutex                                                      mutex_;
++
++      public:
++              impl(const std::wstring& lifecycle_key) : lifecycle_key_(lifecycle_key) {}
++
++              bool check_access(client_connection<wchar_t>::ptr conn)
++              {
++                      tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
++                      std::weak_ptr<client_connection<wchar_t>> weak_ptr(conn);
++                      return locks_.empty() ? true : locks_.find(weak_ptr) != locks_.end();
++              }
++
++              bool try_lock(const std::wstring& lock_phrase, client_connection<wchar_t>::ptr conn)
++              {
++                      tbb::spin_rw_mutex::scoped_lock lock(mutex_, false);
++                      if(lock_phrase_.empty() || lock_phrase == lock_phrase_)
++                      {
++                              std::weak_ptr<client_connection<wchar_t>> weak_ptr(conn);
++                              if(locks_.find(weak_ptr) == locks_.end())
++                              {
++                                      {
++                                              lock.upgrade_to_writer();
++                                              lock_phrase_ = lock_phrase;
++                                              locks_.insert(weak_ptr);
++                                      }
++                                      CASPAR_LOG(info) << lifecycle_key_ << " acquired";
++
++                                      lock.release(); //risk of reentrancy-deadlock if we don't release prior to trying to attach lifecycle-bound object to connection
++                                      
++                                      {
++                                              std::shared_ptr<void> obj(nullptr, [=](void*) { release_lock(weak_ptr); });
++                                              conn->add_lifecycle_bound_object(lifecycle_key_, obj);
++                                      }
++                              }
++
++                              return true;
++                      }
++                      return false;
++              }
++
++              bool release_lock(std::weak_ptr<client_connection<wchar_t>> conn)
++              {
++                      {
++                              tbb::spin_rw_mutex::scoped_lock lock(mutex_, true);
++                              locks_.erase(conn);
++                              if(locks_.empty())
++                                      lock_phrase_.clear();
++                      }
++
++                      CASPAR_LOG(info) << lifecycle_key_ << " released";
++
++                      return true;
++              }
++      };
++
++      lock_container::lock_container(const std::wstring& lifecycle_key) : impl_(spl::make_unique<impl>(lifecycle_key)) {}
++      lock_container::~lock_container() {}
++
++      bool lock_container::check_access(client_connection<wchar_t>::ptr conn) { return impl_->check_access(conn); }
++      bool lock_container::try_lock(const std::wstring& lock_phrase, client_connection<wchar_t>::ptr conn) { return impl_->try_lock(lock_phrase, conn); }
++      bool lock_container::release_lock(std::weak_ptr<client_connection<wchar_t>> conn) { return impl_->release_lock(conn); }
++
++      const std::wstring& lock_container::lifecycle_key() const { return impl_->lifecycle_key_; }
++}}
index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..4664b257ac2417219bbec0d5258e58eb99de9e76
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,29 @@@
++#pragma once
++
++#include <common/memory.h>
++
++#include <boost/noncopyable.hpp>
++
++#include "protocol_strategy.h"
++
++namespace caspar { namespace IO {
++
++      class lock_container : public boost::noncopyable
++      {
++      public:
++              typedef spl::shared_ptr<lock_container> ptr_type;
++
++              lock_container(const std::wstring& lifecycle_key);
++              ~lock_container();
++
++              bool check_access(client_connection<wchar_t>::ptr conn);
++              bool try_lock(const std::wstring& lock_phrase, client_connection<wchar_t>::ptr conn);
++              bool release_lock(std::weak_ptr<client_connection<wchar_t>> conn);
++
++              const std::wstring& lifecycle_key() const;
++
++      private:
++              struct impl;
++              spl::unique_ptr<impl> impl_;
++      };
++}}