using IO::ClientInfoPtr;
-const std::wstring AMCPProtocolStrategy::MessageDelimiter = TEXT("\r\n");
-
-inline std::shared_ptr<core::video_channel> GetChannelSafe(unsigned int index, const std::vector<spl::shared_ptr<core::video_channel>>& channels)
+struct AMCPProtocolStrategy::impl
{
- return index < channels.size() ? std::shared_ptr<core::video_channel>(channels[index]) : nullptr;
-}
-
-AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : channels_(channels) {
- AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue());
- commandQueues_.push_back(pGeneralCommandQueue);
-
+private:
+ std::vector<channel_context> channels_;
+ std::vector<AMCPCommandQueue::ptr_type> commandQueues_;
- std::shared_ptr<core::video_channel> pChannel;
- unsigned int index = -1;
- //Create a commandpump for each video_channel
- while((pChannel = GetChannelSafe(++index, channels_)) != 0) {
- AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue());
- std::wstring title = TEXT("video_channel ");
+public:
+ impl(const std::vector<spl::shared_ptr<core::video_channel>>& channels)
+ {
+ commandQueues_.push_back(std::make_shared<AMCPCommandQueue>());
- //HACK: Perform real conversion from int to string
- TCHAR num = TEXT('1')+static_cast<TCHAR>(index);
- title += num;
-
- commandQueues_.push_back(pChannelCommandQueue);
+ int index = 0;
+ BOOST_FOREACH(const spl::shared_ptr<core::video_channel>& channel, channels)
+ {
+ std::wstring lifecycle_key = L"lock" + boost::lexical_cast<std::wstring>(index);
+ channels_.push_back(channel_context(channel, lifecycle_key));
+ auto queue(std::make_shared<AMCPCommandQueue>());
+ commandQueues_.push_back(queue);
+ ++index;
+ }
}
-}
-
-AMCPProtocolStrategy::~AMCPProtocolStrategy() {
-}
-
-void AMCPProtocolStrategy::Parse(const TCHAR* pData, int charCount, ClientInfoPtr pClientInfo)
-{
- size_t pos;
- std::wstring recvData(pData, charCount);
- std::wstring availibleData = (pClientInfo != nullptr ? pClientInfo->currentMessage_ : L"") + recvData;
- while(true) {
- pos = availibleData.find(MessageDelimiter);
- if(pos != std::wstring::npos)
+ ~impl() {}
+
+ enum parser_state {
+ New = 0,
+ GetSwitch,
+ GetCommand,
+ GetParameters
+ };
+ enum error_state {
+ no_error = 0,
+ command_error,
+ channel_error,
+ parameters_error,
+ unknown_error,
+ access_error
+ };
+
+ struct command_interpreter_result
+ {
+ command_interpreter_result() : error(no_error) {}
+
+ std::shared_ptr<caspar::IO::lock_container> lock;
+ std::wstring command_name;
+ AMCPCommand::ptr_type command;
+ error_state error;
+ AMCPCommandQueue::ptr_type queue;
+ };
+
+ //The paser method expects message to be complete messages with the delimiter stripped away.
+ //Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
+ void Parse(const std::wstring& message, ClientInfoPtr client)
+ {
+ CASPAR_LOG(info) << L"Received message from " << client->print() << ": " << message << L"\\r\\n";
+
+ command_interpreter_result result;
+ if(interpret_command_string(message, result, client))
{
- std::wstring message = availibleData.substr(0,pos);
-
- //This is where a complete message gets taken care of
- if(message.length() > 0) {
- ProcessMessage(message, pClientInfo);
- }
+ if(result.lock && !result.lock->check_access(client))
+ result.error = access_error;
+ else
+ result.queue->AddCommand(result.command);
+ }
+
+ if(result.error != no_error)
+ {
+ std::wstringstream answer;
+ boost::to_upper(result.command_name);
- std::size_t nextStartPos = pos + MessageDelimiter.length();
- if(nextStartPos < availibleData.length())
- availibleData = availibleData.substr(nextStartPos);
- else {
- availibleData.clear();
+ switch(result.error)
+ {
+ case command_error:
+ answer << L"400 ERROR\r\n" << message << "\r\n";
+ break;
+ case channel_error:
+ answer << L"401 " << result.command_name << " ERROR\r\n";
+ break;
+ case parameters_error:
+ answer << L"402 " << result.command_name << " ERROR\r\n";
+ break;
+ case access_error:
+ answer << L"503 " << result.command_name << " FAILED\r\n";
+ break;
+ default:
+ answer << L"500 FAILED\r\n";
break;
}
- }
- else
- {
- break;
+ client->send(answer.str());
}
}
- if(pClientInfo)
- pClientInfo->currentMessage_ = availibleData;
-}
-void AMCPProtocolStrategy::ProcessMessage(const std::wstring& message, ClientInfoPtr& pClientInfo)
-{
- CASPAR_LOG(info) << L"Received message from " << pClientInfo->print() << ": " << message << L"\\r\\n";
-
- bool bError = true;
- MessageParserState state = New;
+private:
+ friend class AMCPCommand;
- AMCPCommand::ptr_type pCommand(InterpretCommandString(message, &state));
+ bool interpret_command_string(const std::wstring& message, command_interpreter_result& result, ClientInfoPtr client)
+ {
+ try
+ {
+ std::vector<std::wstring> tokens;
+ parser_state state = New;
- if(pCommand != 0) {
- pCommand->SetClientInfo(pClientInfo);
- if(QueueCommand(pCommand))
- bError = false;
- else
- state = GetChannel;
- }
+ tokenize(message, &tokens);
- if(bError == true) {
- std::wstringstream answer;
- switch(state)
+ //parse the message one token at the time
+ auto end = tokens.end();
+ auto it = tokens.begin();
+ while(it != end && result.error == no_error)
+ {
+ switch(state)
+ {
+ case New:
+ if((*it)[0] == TEXT('/'))
+ state = GetSwitch;
+ else
+ state = GetCommand;
+ break;
+
+ case GetSwitch:
+ //command_switch = (*it); //we dont care for the switch anymore
+ state = GetCommand;
+ ++it;
+ break;
+
+ case GetCommand:
+ {
+ result.command_name = (*it);
+ result.command = create_command(result.command_name, client);
+ if(result.command) //the command doesn't need a channel
+ {
+ result.queue = commandQueues_[0];
+ state = GetParameters;
+ }
+ else
+ {
+ //get channel index from next token
+ int channel_index = -1;
+ int layer_index = -1;
+
+ ++it;
+ if(it == end)
+ {
+ result.error = parameters_error;
+ break;
+ }
+
+ { //parse channel/layer token
+ try
+ {
+ std::wstring channelid_str = boost::trim_copy(*it);
+ std::vector<std::wstring> split;
+ boost::split(split, channelid_str, boost::is_any_of("-"));
+
+ channel_index = boost::lexical_cast<int>(split[0]) - 1;
+ if(split.size() > 1)
+ layer_index = boost::lexical_cast<int>(split[1]);
+ }
+ catch(...)
+ {
+ result.error = channel_error;
+ break;
+ }
+ }
+
+ if(channel_index >= 0 && channel_index < channels_.size())
+ {
+ result.command = create_channel_command(result.command_name, client, channels_.at(channel_index), channel_index, layer_index);
+ if(result.command)
+ {
+ result.lock = channels_.at(channel_index).lock;
+ result.queue = commandQueues_[channel_index + 1];
+ }
+ else
+ {
+ result.error = command_error;
+ break;
+ }
+ }
+ else
+ {
+ result.error = channel_error;
+ break;
+ }
+ }
+
+ state = GetParameters;
+ ++it;
+ }
+ break;
+
+ case GetParameters:
+ {
+ int parameterCount=0;
+ while(it != end)
+ {
+ result.command->parameters().push_back((*it));
+ ++it;
+ ++parameterCount;
+ }
+ }
+ break;
+ }
+ }
+
+ if(result.command && result.error == no_error && result.command->parameters().size() < result.command->minimum_parameters()) {
+ result.error = parameters_error;
+ }
+ }
+ catch(...)
{
- case GetCommand:
- answer << TEXT("400 ERROR\r\n") + message << "\r\n";
- break;
- case GetChannel:
- answer << TEXT("401 ERROR\r\n");
- break;
- case GetParameters:
- answer << TEXT("402 ERROR\r\n");
- break;
- default:
- answer << TEXT("500 FAILED\r\n");
- break;
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ result.error = unknown_error;
}
- pClientInfo->Send(answer.str());
+
+ return result.error == no_error;
}
-}
-AMCPCommand::ptr_type AMCPProtocolStrategy::InterpretCommandString(const std::wstring& message, MessageParserState* pOutState)
-{
- std::vector<std::wstring> tokens;
- unsigned int currentToken = 0;
- std::wstring commandSwitch;
+ std::size_t tokenize(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
+ {
+ //split on whitespace but keep strings within quotationmarks
+ //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
- AMCPCommand::ptr_type pCommand;
- MessageParserState state = New;
+ std::wstring currentToken;
- char inQuote = 0;
- std::size_t tokensInMessage = TokenizeMessage(message, &tokens);
++ bool inQuote = false;
+ bool getSpecialCode = false;
- //parse the message one token at the time
- while(currentToken < tokensInMessage)
- {
- switch(state)
+ for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
{
- case New:
- if(tokens[currentToken][0] == TEXT('/'))
- state = GetSwitch;
- else
- state = GetCommand;
- break;
-
- case GetSwitch:
- commandSwitch = tokens[currentToken];
- state = GetCommand;
- ++currentToken;
- break;
-
- case GetCommand:
- pCommand = CommandFactory(tokens[currentToken]);
- if(pCommand == 0) {
- goto ParseFinnished;
- }
- else
+ if(getSpecialCode)
{
- pCommand->SetChannels(channels_);
- //Set scheduling
- if(commandSwitch.size() > 0) {
- transform(commandSwitch.begin(), commandSwitch.end(), commandSwitch.begin(), toupper);
-
- //if(commandSwitch == TEXT("/APP"))
- // pCommand->SetScheduling(AddToQueue);
- //else if(commandSwitch == TEXT("/IMMF"))
- // pCommand->SetScheduling(ImmediatelyAndClear);
- }
+ //insert code-handling here
+ switch(message[charIndex])
+ {
+ case TEXT('\\'):
+ currentToken += TEXT("\\");
+ break;
+ case TEXT('\"'):
+ currentToken += TEXT("\"");
+ break;
+ case TEXT('n'):
+ currentToken += TEXT("\n");
+ break;
+ default:
+ break;
+ };
+ getSpecialCode = false;
+ continue;
+ }
- if(pCommand->NeedChannel())
- state = GetChannel;
- else
- state = GetParameters;
+ if(message[charIndex]==TEXT('\\'))
+ {
+ getSpecialCode = true;
+ continue;
}
- ++currentToken;
- break;
- case GetParameters:
+ if(message[charIndex]==' ' && inQuote==false)
{
- _ASSERTE(pCommand != 0);
- int parameterCount=0;
- while(currentToken<tokensInMessage)
+ if(currentToken.size()>0)
{
- pCommand->AddParameter(tokens[currentToken++]);
- ++parameterCount;
- }
-
- if(parameterCount < pCommand->GetMinimumParameters()) {
- goto ParseFinnished;
+ pTokenVector->push_back(currentToken);
+ currentToken.clear();
}
-
- state = Done;
- break;
+ continue;
}
- case GetChannel:
+ if(message[charIndex]==TEXT('\"'))
{
- inQuote ^= 1;
-// assert(pCommand != 0);
-
- std::wstring str = boost::trim_copy(tokens[currentToken]);
- std::vector<std::wstring> split;
- boost::split(split, str, boost::is_any_of("-"));
-
- int channelIndex = -1;
- int layerIndex = -1;
- try
- {
- channelIndex = boost::lexical_cast<int>(split[0]) - 1;
++ inQuote = !inQuote;
- if(split.size() > 1)
- layerIndex = boost::lexical_cast<int>(split[1]);
- }
- catch(...)
+ if(currentToken.size()>0 || !inQuote)
{
- goto ParseFinnished;
+ pTokenVector->push_back(currentToken);
+ currentToken.clear();
}
-
- std::shared_ptr<core::video_channel> pChannel = GetChannelSafe(channelIndex, channels_);
- if(pChannel == 0) {
- goto ParseFinnished;
- }
-
- pCommand->SetChannel(pChannel);
- pCommand->SetChannels(channels_);
- pCommand->SetChannelIndex(channelIndex);
- pCommand->SetLayerIntex(layerIndex);
-
- state = GetParameters;
- ++currentToken;
- break;
+ continue;
}
- default: //Done and unexpected
- goto ParseFinnished;
- }
- }
-
-ParseFinnished:
- if(state == GetParameters && pCommand->GetMinimumParameters()==0)
- state = Done;
-
- if(state != Done) {
- pCommand.reset();
- }
-
- if(pOutState != 0) {
- *pOutState = state;
- }
-
- return pCommand;
-}
-
-bool AMCPProtocolStrategy::QueueCommand(AMCPCommand::ptr_type pCommand) {
- if(pCommand->NeedChannel()) {
- unsigned int channelIndex = pCommand->GetChannelIndex() + 1;
- if(commandQueues_.size() > channelIndex) {
- commandQueues_[channelIndex]->AddCommand(pCommand);
+ currentToken += message[charIndex];
}
- else
- return false;
- }
- else {
- commandQueues_[0]->AddCommand(pCommand);
- }
- return true;
-}
-
-AMCPCommand::ptr_type AMCPProtocolStrategy::CommandFactory(const std::wstring& str)
-{
- std::wstring s = str;
- transform(s.begin(), s.end(), s.begin(), toupper);
-
- if (s == TEXT("MIXER")) return std::make_shared<MixerCommand>();
- else if(s == TEXT("DIAG")) return std::make_shared<DiagnosticsCommand>();
- else if(s == TEXT("CHANNEL_GRID")) return std::make_shared<ChannelGridCommand>();
- else if(s == TEXT("CALL")) return std::make_shared<CallCommand>();
- else if(s == TEXT("SWAP")) return std::make_shared<SwapCommand>();
- else if(s == TEXT("LOAD")) return std::make_shared<LoadCommand>();
- else if(s == TEXT("LOADBG")) return std::make_shared<LoadbgCommand>();
- else if(s == TEXT("ADD")) return std::make_shared<AddCommand>();
- else if(s == TEXT("REMOVE")) return std::make_shared<RemoveCommand>();
- else if(s == TEXT("PAUSE")) return std::make_shared<PauseCommand>();
- else if(s == TEXT("PLAY")) return std::make_shared<PlayCommand>();
- else if(s == TEXT("STOP")) return std::make_shared<StopCommand>();
- else if(s == TEXT("CLEAR")) return std::make_shared<ClearCommand>();
- else if(s == TEXT("PRINT")) return std::make_shared<PrintCommand>();
- else if(s == TEXT("LOG")) return std::make_shared<LogCommand>();
- else if(s == TEXT("CG")) return std::make_shared<CGCommand>();
- else if(s == TEXT("DATA")) return std::make_shared<DataCommand>();
- else if(s == TEXT("CINF")) return std::make_shared<CinfCommand>();
- else if(s == TEXT("INFO")) return std::make_shared<InfoCommand>(channels_);
- else if(s == TEXT("CLS")) return std::make_shared<ClsCommand>();
- else if(s == TEXT("TLS")) return std::make_shared<TlsCommand>();
- else if(s == TEXT("VERSION")) return std::make_shared<VersionCommand>();
- else if(s == TEXT("BYE")) return std::make_shared<ByeCommand>();
- else if(s == TEXT("SET")) return std::make_shared<SetCommand>();
- //else if(s == TEXT("MONITOR"))
- //{
- // result = AMCPCommandPtr(new MonitorCommand());
- //}
- //else if(s == TEXT("KILL"))
- //{
- // result = AMCPCommandPtr(new KillCommand());
- //}
- return nullptr;
-}
-
-std::size_t AMCPProtocolStrategy::TokenizeMessage(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
-{
- //split on whitespace but keep strings within quotationmarks
- //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
-
- std::wstring currentToken;
-
- bool inQuote = false;
- bool getSpecialCode = false;
- for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
- {
- if(getSpecialCode)
+ if(currentToken.size()>0)
{
- //insert code-handling here
- switch(message[charIndex])
- {
- case TEXT('\\'):
- currentToken += TEXT("\\");
- break;
- case TEXT('\"'):
- currentToken += TEXT("\"");
- break;
- case TEXT('n'):
- currentToken += TEXT("\n");
- break;
- default:
- break;
- };
- getSpecialCode = false;
- continue;
+ pTokenVector->push_back(currentToken);
+ currentToken.clear();
}
- if(message[charIndex]==TEXT('\\'))
- {
- getSpecialCode = true;
- continue;
- }
-
- if(message[charIndex]==' ' && inQuote==false)
- {
- if(currentToken.size()>0)
- {
- pTokenVector->push_back(currentToken);
- currentToken.clear();
- }
- continue;
- }
-
- if(message[charIndex]==TEXT('\"'))
- {
- inQuote = !inQuote;
-
- if(currentToken.size() > 0 || !inQuote)
- {
- pTokenVector->push_back(currentToken);
- currentToken.clear();
- }
- continue;
- }
+ return pTokenVector->size();
+ }
- currentToken += message[charIndex];
+ AMCPCommand::ptr_type create_command(const std::wstring& str, ClientInfoPtr client)
+ {
+ std::wstring s = boost::to_upper_copy(str);
+ if(s == TEXT("DIAG")) return std::make_shared<DiagnosticsCommand>(client);
+ else if(s == TEXT("CHANNEL_GRID")) return std::make_shared<ChannelGridCommand>(client);
+ else if(s == TEXT("DATA")) return std::make_shared<DataCommand>(client);
+ else if(s == TEXT("CINF")) return std::make_shared<CinfCommand>(client);
+ else if(s == TEXT("INFO")) return std::make_shared<InfoCommand>(client, channels_);
+ else if(s == TEXT("CLS")) return std::make_shared<ClsCommand>(client);
+ else if(s == TEXT("TLS")) return std::make_shared<TlsCommand>(client);
+ else if(s == TEXT("VERSION")) return std::make_shared<VersionCommand>(client);
+ else if(s == TEXT("BYE")) return std::make_shared<ByeCommand>(client);
+ else if(s == TEXT("LOCK")) return std::make_shared<LockCommand>(client, channels_);
+ else if(s == TEXT("LOG")) return std::make_shared<LogCommand>(client);
+ //else if(s == TEXT("KILL"))
+ //{
+ // result = AMCPCommandPtr(new KillCommand());
+ //}
+
+ return nullptr;
}
- if(currentToken.size()>0)
+ AMCPCommand::ptr_type create_channel_command(const std::wstring& str, ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index)
{
- pTokenVector->push_back(currentToken);
- currentToken.clear();
+ std::wstring s = boost::to_upper_copy(str);
+
+ if (s == TEXT("MIXER")) return std::make_shared<MixerCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("CALL")) return std::make_shared<CallCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("SWAP")) return std::make_shared<SwapCommand>(client, channel, channel_index, layer_index, channels_);
+ else if(s == TEXT("LOAD")) return std::make_shared<LoadCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("LOADBG")) return std::make_shared<LoadbgCommand>(client, channel, channel_index, layer_index, channels_);
+ else if(s == TEXT("ADD")) return std::make_shared<AddCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("REMOVE")) return std::make_shared<RemoveCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("PAUSE")) return std::make_shared<PauseCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("PLAY")) return std::make_shared<PlayCommand>(client, channel, channel_index, layer_index, channels_);
+ else if(s == TEXT("STOP")) return std::make_shared<StopCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("CLEAR")) return std::make_shared<ClearCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("PRINT")) return std::make_shared<PrintCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("CG")) return std::make_shared<CGCommand>(client, channel, channel_index, layer_index);
+ else if(s == TEXT("SET")) return std::make_shared<SetCommand>(client, channel, channel_index, layer_index);
+
+ return nullptr;
}
+};
+
+
+AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : impl_(spl::make_unique<impl>(channels)) {}
+AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
+void AMCPProtocolStrategy::Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo) { impl_->Parse(msg, pClientInfo); }
- return pTokenVector->size();
-}
} //namespace amcp
}} //namespace caspar