-/*\r
-* copyright (c) 2010 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG.\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\r
-* GNU General Public License for more details.\r
-\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-*/\r
- \r
-#include "../StdAfx.h"\r
-\r
-#include "AMCPProtocolStrategy.h"\r
-\r
-#include "../util/AsyncEventServer.h"\r
-#include "AMCPCommandsImpl.h"\r
-\r
-#include <stdio.h>\r
-#include <crtdbg.h>\r
-#include <string.h>\r
-#include <algorithm>\r
-#include <cctype>\r
-\r
-#include <boost/algorithm/string/trim.hpp>\r
-#include <boost/algorithm/string/split.hpp>\r
-#include <boost/lexical_cast.hpp>\r
-\r
-#if defined(_MSC_VER)\r
-#pragma warning (push, 1) // TODO: Legacy code, just disable warnings\r
-#endif\r
-\r
-namespace caspar { namespace protocol { namespace amcp {\r
-\r
-using IO::ClientInfoPtr;\r
-\r
-const std::wstring AMCPProtocolStrategy::MessageDelimiter = TEXT("\r\n");\r
-\r
-inline std::shared_ptr<core::video_channel> GetChannelSafe(unsigned int index, const std::vector<safe_ptr<core::video_channel>>& channels)\r
-{\r
- return index < channels.size() ? std::shared_ptr<core::video_channel>(channels[index]) : nullptr;\r
-}\r
-\r
-AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<safe_ptr<core::video_channel>>& channels) : channels_(channels) {\r
- AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue());\r
- if(!pGeneralCommandQueue->Start()) {\r
- CASPAR_LOG(error) << "Failed to start the general command-queue";\r
-\r
- //TODO: THROW!\r
- }\r
- else\r
- commandQueues_.push_back(pGeneralCommandQueue);\r
-\r
-\r
- std::shared_ptr<core::video_channel> pChannel;\r
- unsigned int index = -1;\r
- //Create a commandpump for each video_channel\r
- while((pChannel = GetChannelSafe(++index, channels_)) != 0) {\r
- AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue());\r
- std::wstring title = TEXT("video_channel ");\r
-\r
- //HACK: Perform real conversion from int to string\r
- TCHAR num = TEXT('1')+static_cast<TCHAR>(index);\r
- title += num;\r
-\r
- if(!pChannelCommandQueue->Start()) {\r
- std::wstring logString = TEXT("Failed to start command-queue for ");\r
- logString += title;\r
- CASPAR_LOG(error) << logString;\r
-\r
- //TODO: THROW!\r
- }\r
- else\r
- commandQueues_.push_back(pChannelCommandQueue);\r
- }\r
-}\r
-\r
-AMCPProtocolStrategy::~AMCPProtocolStrategy() {\r
-}\r
-\r
-void AMCPProtocolStrategy::Parse(const TCHAR* pData, int charCount, ClientInfoPtr pClientInfo)\r
-{\r
- size_t pos;\r
- std::wstring recvData(pData, charCount);\r
- std::wstring availibleData = (pClientInfo != nullptr ? pClientInfo->currentMessage_ : L"") + recvData;\r
-\r
- while(true) {\r
- pos = availibleData.find(MessageDelimiter);\r
- if(pos != std::wstring::npos)\r
- {\r
- std::wstring message = availibleData.substr(0,pos);\r
-\r
- //This is where a complete message gets taken care of\r
- if(message.length() > 0) {\r
- ProcessMessage(message, pClientInfo);\r
- }\r
-\r
- std::size_t nextStartPos = pos + MessageDelimiter.length();\r
- if(nextStartPos < availibleData.length())\r
- availibleData = availibleData.substr(nextStartPos);\r
- else {\r
- availibleData.clear();\r
- break;\r
- }\r
- }\r
- else\r
- {\r
- break;\r
- }\r
- }\r
- if(pClientInfo)\r
- pClientInfo->currentMessage_ = availibleData;\r
-}\r
-\r
-void AMCPProtocolStrategy::ProcessMessage(const std::wstring& message, ClientInfoPtr& pClientInfo)\r
-{\r
- bool bError = true;\r
- MessageParserState state = New;\r
-\r
- AMCPCommandPtr pCommand;\r
-\r
- pCommand = InterpretCommandString(message, &state);\r
-\r
- if(pCommand != 0) {\r
- pCommand->SetClientInfo(pClientInfo); \r
- if(QueueCommand(pCommand))\r
- bError = false;\r
- else\r
- state = GetChannel;\r
- }\r
-\r
- if(bError == true) {\r
- std::wstringstream answer;\r
- switch(state)\r
- {\r
- case GetCommand:\r
- answer << TEXT("400 ERROR\r\n") + message << "\r\n";\r
- break;\r
- case GetChannel:\r
- answer << TEXT("401 ERROR\r\n");\r
- break;\r
- case GetParameters:\r
- answer << TEXT("402 ERROR\r\n");\r
- break;\r
- default:\r
- answer << TEXT("500 FAILED\r\n");\r
- break;\r
- }\r
- pClientInfo->Send(answer.str());\r
- }\r
-}\r
-\r
-AMCPCommandPtr AMCPProtocolStrategy::InterpretCommandString(const std::wstring& message, MessageParserState* pOutState)\r
-{\r
- std::vector<std::wstring> tokens;\r
- unsigned int currentToken = 0;\r
- std::wstring commandSwitch;\r
-\r
- AMCPCommandPtr pCommand;\r
- MessageParserState state = New;\r
-\r
- CASPAR_LOG(trace) << message;\r
-\r
- std::size_t tokensInMessage = TokenizeMessage(message, &tokens);\r
-\r
- //parse the message one token at the time\r
- while(currentToken < tokensInMessage)\r
- {\r
- switch(state)\r
- {\r
- case New:\r
- if(tokens[currentToken][0] == TEXT('/'))\r
- state = GetSwitch;\r
- else\r
- state = GetCommand;\r
- break;\r
-\r
- case GetSwitch:\r
- commandSwitch = tokens[currentToken];\r
- state = GetCommand;\r
- ++currentToken;\r
- break;\r
-\r
- case GetCommand:\r
- pCommand = CommandFactory(tokens[currentToken]);\r
- if(pCommand == 0) {\r
- goto ParseFinnished;\r
- }\r
- else\r
- {\r
- //Set scheduling\r
- if(commandSwitch.size() > 0) {\r
- transform(commandSwitch.begin(), commandSwitch.end(), commandSwitch.begin(), toupper);\r
-\r
- if(commandSwitch == TEXT("/APP"))\r
- pCommand->SetScheduling(AddToQueue);\r
- else if(commandSwitch == TEXT("/IMMF"))\r
- pCommand->SetScheduling(ImmediatelyAndClear);\r
- }\r
-\r
- if(pCommand->NeedChannel())\r
- state = GetChannel;\r
- else\r
- state = GetParameters;\r
- }\r
- ++currentToken;\r
- break;\r
-\r
- case GetParameters:\r
- {\r
- _ASSERTE(pCommand != 0);\r
- int parameterCount=0;\r
- while(currentToken<tokensInMessage)\r
- {\r
- pCommand->AddParameter(tokens[currentToken++]);\r
- ++parameterCount;\r
- }\r
-\r
- if(parameterCount < pCommand->GetMinimumParameters()) {\r
- goto ParseFinnished;\r
- }\r
-\r
- state = Done;\r
- break;\r
- }\r
-\r
- case GetChannel:\r
- {\r
-// assert(pCommand != 0);\r
-\r
- std::wstring str = boost::trim_copy(tokens[currentToken]);\r
- std::vector<std::wstring> split;\r
- boost::split(split, str, boost::is_any_of("-"));\r
- \r
- int channelIndex = -1;\r
- int layerIndex = -1;\r
- try\r
- {\r
- channelIndex = boost::lexical_cast<int>(split[0]) - 1;\r
-\r
- if(split.size() > 1)\r
- layerIndex = boost::lexical_cast<int>(split[1]);\r
- }\r
- catch(...)\r
- {\r
- goto ParseFinnished;\r
- }\r
-\r
- std::shared_ptr<core::video_channel> pChannel = GetChannelSafe(channelIndex, channels_);\r
- if(pChannel == 0) {\r
- goto ParseFinnished;\r
- }\r
-\r
- pCommand->SetChannel(pChannel);\r
- pCommand->SetChannels(channels_);\r
- pCommand->SetChannelIndex(channelIndex);\r
- pCommand->SetLayerIntex(layerIndex);\r
-\r
- state = GetParameters;\r
- ++currentToken;\r
- break;\r
- }\r
-\r
- default: //Done and unexpected\r
- goto ParseFinnished;\r
- }\r
- }\r
-\r
-ParseFinnished:\r
- if(state == GetParameters && pCommand->GetMinimumParameters()==0)\r
- state = Done;\r
-\r
- if(state != Done) {\r
- pCommand.reset();\r
- }\r
-\r
- if(pOutState != 0) {\r
- *pOutState = state;\r
- }\r
-\r
- return pCommand;\r
-}\r
-\r
-bool AMCPProtocolStrategy::QueueCommand(AMCPCommandPtr pCommand) {\r
- if(pCommand->NeedChannel()) {\r
- unsigned int channelIndex = pCommand->GetChannelIndex() + 1;\r
- if(commandQueues_.size() > channelIndex) {\r
- commandQueues_[channelIndex]->AddCommand(pCommand);\r
- }\r
- else\r
- return false;\r
- }\r
- else {\r
- commandQueues_[0]->AddCommand(pCommand);\r
- }\r
- return true;\r
-}\r
-\r
-AMCPCommandPtr AMCPProtocolStrategy::CommandFactory(const std::wstring& str)\r
-{\r
- std::wstring s = str;\r
- transform(s.begin(), s.end(), s.begin(), toupper);\r
- \r
- if (s == TEXT("MIXER")) return std::make_shared<MixerCommand>();\r
- else if(s == TEXT("PARAM")) return std::make_shared<ParamCommand>();\r
- else if(s == TEXT("SWAP")) return std::make_shared<SwapCommand>();\r
- else if(s == TEXT("LOAD")) return std::make_shared<LoadCommand>();\r
- else if(s == TEXT("LOADBG")) return std::make_shared<LoadbgCommand>();\r
- else if(s == TEXT("ADD")) return std::make_shared<AddCommand>();\r
- else if(s == TEXT("REMOVE")) return std::make_shared<RemoveCommand>();\r
- else if(s == TEXT("PAUSE")) return std::make_shared<PauseCommand>();\r
- else if(s == TEXT("PLAY")) return std::make_shared<PlayCommand>();\r
- else if(s == TEXT("STOP")) return std::make_shared<StopCommand>();\r
- else if(s == TEXT("CLEAR")) return std::make_shared<ClearCommand>();\r
- else if(s == TEXT("CG")) return std::make_shared<CGCommand>();\r
- else if(s == TEXT("DATA")) return std::make_shared<DataCommand>();\r
- else if(s == TEXT("CINF")) return std::make_shared<CinfCommand>();\r
- else if(s == TEXT("INFO")) return std::make_shared<InfoCommand>(channels_);\r
- else if(s == TEXT("CLS")) return std::make_shared<ClsCommand>();\r
- else if(s == TEXT("TLS")) return std::make_shared<TlsCommand>();\r
- else if(s == TEXT("VERSION")) return std::make_shared<VersionCommand>();\r
- else if(s == TEXT("BYE")) return std::make_shared<ByeCommand>();\r
- else if(s == TEXT("SET")) return std::make_shared<SetCommand>();\r
- //else if(s == TEXT("MONITOR"))\r
- //{\r
- // result = AMCPCommandPtr(new MonitorCommand());\r
- //}\r
- //else if(s == TEXT("KILL"))\r
- //{\r
- // result = AMCPCommandPtr(new KillCommand());\r
- //}\r
- return nullptr;\r
-}\r
-\r
-std::size_t AMCPProtocolStrategy::TokenizeMessage(const std::wstring& message, std::vector<std::wstring>* pTokenVector)\r
-{\r
- //split on whitespace but keep strings within quotationmarks\r
- //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string\r
-\r
- std::wstring currentToken;\r
-\r
- char inQuote = 0;\r
- bool getSpecialCode = false;\r
-\r
- for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)\r
- {\r
- if(getSpecialCode)\r
- {\r
- //insert code-handling here\r
- switch(message[charIndex])\r
- {\r
- case TEXT('\\'):\r
- currentToken += TEXT("\\");\r
- break;\r
- case TEXT('\"'):\r
- currentToken += TEXT("\"");\r
- break;\r
- case TEXT('n'):\r
- currentToken += TEXT("\n");\r
- break;\r
- default:\r
- break;\r
- };\r
- getSpecialCode = false;\r
- continue;\r
- }\r
-\r
- if(message[charIndex]==TEXT('\\'))\r
- {\r
- getSpecialCode = true;\r
- continue;\r
- }\r
-\r
- if(message[charIndex]==' ' && inQuote==false)\r
- {\r
- if(currentToken.size()>0)\r
- {\r
- pTokenVector->push_back(currentToken);\r
- currentToken.clear();\r
- }\r
- continue;\r
- }\r
-\r
- if(message[charIndex]==TEXT('\"'))\r
- {\r
- inQuote ^= 1;\r
-\r
- if(currentToken.size()>0)\r
- {\r
- pTokenVector->push_back(currentToken);\r
- currentToken.clear();\r
- }\r
- continue;\r
- }\r
-\r
- currentToken += message[charIndex];\r
- }\r
-\r
- if(currentToken.size()>0)\r
- {\r
- pTokenVector->push_back(currentToken);\r
- currentToken.clear();\r
- }\r
-\r
- return pTokenVector->size();\r
-}\r
-\r
-} //namespace amcp\r
-}} //namespace caspar
\ No newline at end of file
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Nicklas P Andersson
+*/
+
+
+#include "../StdAfx.h"
+
+#include "AMCPProtocolStrategy.h"
+#include "amcp_shared.h"
+#include "AMCPCommand.h"
+#include "AMCPCommandQueue.h"
+#include "amcp_command_repository.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <algorithm>
+#include <cctype>
+#include <future>
+
+#include <core/help/help_repository.h>
+#include <core/help/help_sink.h>
+
+#include <boost/algorithm/string/trim.hpp>
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string/replace.hpp>
+#include <boost/lexical_cast.hpp>
+
+#if defined(_MSC_VER)
+#pragma warning (push, 1) // TODO: Legacy code, just disable warnings
+#endif
+
+namespace caspar { namespace protocol { namespace amcp {
+
+using IO::ClientInfoPtr;
+
+template <typename Out, typename In>
+bool try_lexical_cast(const In& input, Out& result)
+{
+ Out saved = result;
+ bool success = boost::conversion::detail::try_lexical_convert(input, result);
+
+ if (!success)
+ result = saved; // Needed because of how try_lexical_convert is implemented.
+
+ return success;
+}
+
+struct AMCPProtocolStrategy::impl
+{
+private:
+ std::vector<AMCPCommandQueue::ptr_type> commandQueues_;
+ spl::shared_ptr<amcp_command_repository> repo_;
+
+public:
+ impl(const std::wstring& name, const spl::shared_ptr<amcp_command_repository>& repo)
+ : repo_(repo)
+ {
+ commandQueues_.push_back(spl::make_shared<AMCPCommandQueue>(L"General Queue for " + name));
+
+ for (int i = 0; i < repo_->channels().size(); ++i)
+ {
+ commandQueues_.push_back(spl::make_shared<AMCPCommandQueue>(
+ L"Channel " + boost::lexical_cast<std::wstring>(i + 1) + L" for " + name));
+ }
+ }
+
+ ~impl() {}
+
+ enum class error_state {
+ no_error = 0,
+ command_error,
+ channel_error,
+ parameters_error,
+ unknown_error,
+ access_error
+ };
+
+ struct command_interpreter_result
+ {
+ std::shared_ptr<caspar::IO::lock_container> lock;
+ std::wstring command_name;
+ AMCPCommand::ptr_type command;
+ error_state error = error_state::no_error;
+ std::shared_ptr<AMCPCommandQueue> queue;
+ };
+
+ //The paser method expects message to be complete messages with the delimiter stripped away.
+ //Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
+ void Parse(const std::wstring& message, ClientInfoPtr client)
+ {
+ CASPAR_LOG_COMMUNICATION(info) << L"Received message from " << client->address() << ": " << message << L"\\r\\n";
+
+ command_interpreter_result result;
+ if(interpret_command_string(message, result, client))
+ {
+ if(result.lock && !result.lock->check_access(client))
+ result.error = error_state::access_error;
+ else
+ result.queue->AddCommand(result.command);
+ }
+
+ if (result.error != error_state::no_error)
+ {
+ std::wstringstream answer;
+
+ switch(result.error)
+ {
+ case error_state::command_error:
+ answer << L"400 ERROR\r\n" << message << "\r\n";
+ break;
+ case error_state::channel_error:
+ answer << L"401 " << result.command_name << " ERROR\r\n";
+ break;
+ case error_state::parameters_error:
+ answer << L"402 " << result.command_name << " ERROR\r\n";
+ break;
+ case error_state::access_error:
+ answer << L"503 " << result.command_name << " FAILED\r\n";
+ break;
+ case error_state::unknown_error:
+ answer << L"500 FAILED\r\n";
+ break;
+ default:
+ CASPAR_THROW_EXCEPTION(programming_error()
+ << msg_info(L"Unhandled error_state enum constant " + boost::lexical_cast<std::wstring>(static_cast<int>(result.error))));
+ }
+ client->send(answer.str());
+ }
+ }
+
+private:
+ bool interpret_command_string(const std::wstring& message, command_interpreter_result& result, ClientInfoPtr client)
+ {
+ try
+ {
+ std::list<std::wstring> tokens;
+ tokenize(message, tokens);
+
+ // Discard GetSwitch
+ if (!tokens.empty() && tokens.front().at(0) == L'/')
+ tokens.pop_front();
+
+ // Fail if no more tokens.
+ if (tokens.empty())
+ {
+ result.error = error_state::command_error;
+ return false;
+ }
+
+ // Consume command name
+ result.command_name = boost::to_upper_copy(tokens.front());
+ tokens.pop_front();
+
+ // Determine whether the next parameter is a channel spec or not
+ int channel_index = -1;
+ int layer_index = -1;
+ std::wstring channel_spec;
+
+ if (!tokens.empty())
+ {
+ channel_spec = tokens.front();
+ std::wstring channelid_str = boost::trim_copy(channel_spec);
+ std::vector<std::wstring> split;
+ boost::split(split, channelid_str, boost::is_any_of("-"));
+
+ // Use non_throwing lexical cast to not hit exception break point all the time.
+ if (try_lexical_cast(split[0], channel_index))
+ {
+ --channel_index;
+
+ if (split.size() > 1)
+ try_lexical_cast(split[1], layer_index);
+
+ // Consume channel-spec
+ tokens.pop_front();
+ }
+ }
+
+ bool is_channel_command = channel_index != -1;
+
+ // Create command instance
+ if (is_channel_command)
+ {
+ result.command = repo_->create_channel_command(result.command_name, client, channel_index, layer_index, tokens);
+
+ if (result.command)
+ {
+ result.lock = repo_->channels().at(channel_index).lock;
+ result.queue = commandQueues_.at(channel_index + 1);
+ }
+ else // Might be a non channel command, although the first argument is numeric
+ {
+ // Restore backed up channel spec string.
+ tokens.push_front(channel_spec);
+ result.command = repo_->create_command(result.command_name, client, tokens);
+
+ if (result.command)
+ result.queue = commandQueues_.at(0);
+ }
+ }
+ else
+ {
+ result.command = repo_->create_command(result.command_name, client, tokens);
+
+ if (result.command)
+ result.queue = commandQueues_.at(0);
+ }
+
+ if (!result.command)
+ result.error = error_state::command_error;
+ else
+ {
+ std::vector<std::wstring> parameters(tokens.begin(), tokens.end());
+
+ result.command->parameters() = std::move(parameters);
+
+ if (result.command->parameters().size() < result.command->minimum_parameters())
+ result.error = error_state::parameters_error;
+ }
+ }
+ catch (std::out_of_range&)
+ {
+ CASPAR_LOG(error) << "Invalid channel specified.";
+ result.error = error_state::channel_error;
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ result.error = error_state::unknown_error;
+ }
+
+ return result.error == error_state::no_error;
+ }
+
+ template<typename C>
+ std::size_t tokenize(const std::wstring& message, C& pTokenVector)
+ {
+ //split on whitespace but keep strings within quotationmarks
+ //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
+
+ std::wstring currentToken;
+
+ bool inQuote = false;
+ bool getSpecialCode = false;
+
+ for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
+ {
+ if(getSpecialCode)
+ {
+ //insert code-handling here
+ switch(message[charIndex])
+ {
+ case L'\\':
+ currentToken += L"\\";
+ break;
+ case L'\"':
+ currentToken += L"\"";
+ break;
+ case L'n':
+ currentToken += L"\n";
+ break;
+ default:
+ break;
+ };
+ getSpecialCode = false;
+ continue;
+ }
+
+ if(message[charIndex]==L'\\')
+ {
+ getSpecialCode = true;
+ continue;
+ }
+
+ if(message[charIndex]==L' ' && inQuote==false)
+ {
+ if(!currentToken.empty())
+ {
+ pTokenVector.push_back(currentToken);
+ currentToken.clear();
+ }
+ continue;
+ }
+
+ if(message[charIndex]==L'\"')
+ {
+ inQuote = !inQuote;
+
+ if(!currentToken.empty() || !inQuote)
+ {
+ pTokenVector.push_back(currentToken);
+ currentToken.clear();
+ }
+ continue;
+ }
+
+ currentToken += message[charIndex];
+ }
+
+ if(!currentToken.empty())
+ {
+ pTokenVector.push_back(currentToken);
+ currentToken.clear();
+ }
+
+ return pTokenVector.size();
+ }
+};
+
+AMCPProtocolStrategy::AMCPProtocolStrategy(const std::wstring& name, const spl::shared_ptr<amcp_command_repository>& repo)
+ : impl_(spl::make_unique<impl>(name, repo))
+{
+}
+AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
+void AMCPProtocolStrategy::Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo) { impl_->Parse(msg, pClientInfo); }
+
+
+} //namespace amcp
+}} //namespace caspar