#include "../StdAfx.h"
#include "AMCPProtocolStrategy.h"
-#include "AMCPCommandsImpl.h"
#include "amcp_shared.h"
#include "AMCPCommand.h"
#include "AMCPCommandQueue.h"
+#include "amcp_command_repository.h"
#include <stdio.h>
-#include <crtdbg.h>
#include <string.h>
#include <algorithm>
#include <cctype>
+#include <future>
+
+#include <core/help/help_repository.h>
+#include <core/help/help_sink.h>
#include <boost/algorithm/string/trim.hpp>
#include <boost/algorithm/string/split.hpp>
using IO::ClientInfoPtr;
+template <typename Out, typename In>
+bool try_lexical_cast(const In& input, Out& result)
+{
+ Out saved = result;
+ bool success = boost::conversion::detail::try_lexical_convert(input, result);
+
+ if (!success)
+ result = saved; // Needed because of how try_lexical_convert is implemented.
+
+ return success;
+}
+
struct AMCPProtocolStrategy::impl
{
private:
- std::vector<channel_context> channels_;
- std::vector<AMCPCommandQueue::ptr_type> commandQueues_;
+ std::vector<AMCPCommandQueue::ptr_type> commandQueues_;
+ spl::shared_ptr<amcp_command_repository> repo_;
public:
- impl(const std::vector<spl::shared_ptr<core::video_channel>>& channels)
+ impl(const std::wstring& name, const spl::shared_ptr<amcp_command_repository>& repo)
+ : repo_(repo)
{
- commandQueues_.push_back(std::make_shared<AMCPCommandQueue>());
+ commandQueues_.push_back(spl::make_shared<AMCPCommandQueue>(L"General Queue for " + name));
- int index = 0;
- BOOST_FOREACH(const spl::shared_ptr<core::video_channel>& channel, channels)
+ for (int i = 0; i < repo_->channels().size(); ++i)
{
- std::wstring lifecycle_key = L"lock" + boost::lexical_cast<std::wstring>(index);
- channels_.push_back(channel_context(channel, lifecycle_key));
- auto queue(std::make_shared<AMCPCommandQueue>());
- commandQueues_.push_back(queue);
- ++index;
+ commandQueues_.push_back(spl::make_shared<AMCPCommandQueue>(
+ L"Channel " + boost::lexical_cast<std::wstring>(i + 1) + L" for " + name));
}
}
~impl() {}
- enum parser_state {
- New = 0,
- GetSwitch,
- GetCommand,
- GetParameters
- };
- enum error_state {
+ enum class error_state {
no_error = 0,
command_error,
channel_error,
struct command_interpreter_result
{
- command_interpreter_result() : error(no_error) {}
-
std::shared_ptr<caspar::IO::lock_container> lock;
std::wstring command_name;
AMCPCommand::ptr_type command;
- error_state error;
- AMCPCommandQueue::ptr_type queue;
+ error_state error = error_state::no_error;
+ std::shared_ptr<AMCPCommandQueue> queue;
};
//The paser method expects message to be complete messages with the delimiter stripped away.
//Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
void Parse(const std::wstring& message, ClientInfoPtr client)
{
- CASPAR_LOG(info) << L"Received message from " << client->print() << ": " << message << L"\\r\\n";
+ CASPAR_LOG_COMMUNICATION(info) << L"Received message from " << client->address() << ": " << message << L"\\r\\n";
command_interpreter_result result;
if(interpret_command_string(message, result, client))
{
if(result.lock && !result.lock->check_access(client))
- result.error = access_error;
+ result.error = error_state::access_error;
else
result.queue->AddCommand(result.command);
}
- if(result.error != no_error)
+ if (result.error != error_state::no_error)
{
std::wstringstream answer;
- boost::to_upper(result.command_name);
switch(result.error)
{
- case command_error:
+ case error_state::command_error:
answer << L"400 ERROR\r\n" << message << "\r\n";
break;
- case channel_error:
+ case error_state::channel_error:
answer << L"401 " << result.command_name << " ERROR\r\n";
break;
- case parameters_error:
+ case error_state::parameters_error:
answer << L"402 " << result.command_name << " ERROR\r\n";
break;
- case access_error:
+ case error_state::access_error:
answer << L"503 " << result.command_name << " FAILED\r\n";
break;
- default:
+ case error_state::unknown_error:
answer << L"500 FAILED\r\n";
break;
+ default:
+ CASPAR_THROW_EXCEPTION(programming_error()
+ << msg_info(L"Unhandled error_state enum constant " + boost::lexical_cast<std::wstring>(static_cast<int>(result.error))));
}
client->send(answer.str());
}
}
private:
- friend class AMCPCommand;
-
bool interpret_command_string(const std::wstring& message, command_interpreter_result& result, ClientInfoPtr client)
{
try
{
- std::vector<std::wstring> tokens;
- parser_state state = New;
+ std::list<std::wstring> tokens;
+ tokenize(message, tokens);
+
+ // Discard GetSwitch
+ if (!tokens.empty() && tokens.front().at(0) == L'/')
+ tokens.pop_front();
+
+ // Fail if no more tokens.
+ if (tokens.empty())
+ {
+ result.error = error_state::command_error;
+ return false;
+ }
+
+ // Consume command name
+ result.command_name = boost::to_upper_copy(tokens.front());
+ tokens.pop_front();
- tokenize(message, &tokens);
+ // Determine whether the next parameter is a channel spec or not
+ int channel_index = -1;
+ int layer_index = -1;
+ std::wstring channel_spec;
- //parse the message one token at the time
- auto end = tokens.end();
- auto it = tokens.begin();
- while(it != end && result.error == no_error)
+ if (!tokens.empty())
{
- switch(state)
+ channel_spec = tokens.front();
+ std::wstring channelid_str = boost::trim_copy(channel_spec);
+ std::vector<std::wstring> split;
+ boost::split(split, channelid_str, boost::is_any_of("-"));
+
+ // Use non_throwing lexical cast to not hit exception break point all the time.
+ if (try_lexical_cast(split[0], channel_index))
{
- case New:
- if((*it)[0] == TEXT('/'))
- state = GetSwitch;
- else
- state = GetCommand;
- break;
+ --channel_index;
- case GetSwitch:
- //command_switch = (*it); //we dont care for the switch anymore
- state = GetCommand;
- ++it;
- break;
+ if (split.size() > 1)
+ try_lexical_cast(split[1], layer_index);
- case GetCommand:
- {
- result.command_name = (*it);
- result.command = create_command(result.command_name, client);
- if(result.command) //the command doesn't need a channel
- {
- result.queue = commandQueues_[0];
- state = GetParameters;
- }
- else
- {
- //get channel index from next token
- int channel_index = -1;
- int layer_index = -1;
-
- ++it;
- if(it == end)
- {
- result.error = parameters_error;
- break;
- }
-
- { //parse channel/layer token
- try
- {
- std::wstring channelid_str = boost::trim_copy(*it);
- std::vector<std::wstring> split;
- boost::split(split, channelid_str, boost::is_any_of("-"));
-
- channel_index = boost::lexical_cast<int>(split[0]) - 1;
- if(split.size() > 1)
- layer_index = boost::lexical_cast<int>(split[1]);
- }
- catch(...)
- {
- result.error = channel_error;
- break;
- }
- }
-
- if(channel_index >= 0 && channel_index < channels_.size())
- {
- result.command = create_channel_command(result.command_name, client, channels_.at(channel_index), channel_index, layer_index);
- if(result.command)
- {
- result.lock = channels_.at(channel_index).lock;
- result.queue = commandQueues_[channel_index + 1];
- }
- else
- {
- result.error = command_error;
- break;
- }
- }
- else
- {
- result.error = channel_error;
- break;
- }
- }
-
- state = GetParameters;
- ++it;
- }
- break;
+ // Consume channel-spec
+ tokens.pop_front();
+ }
+ }
- case GetParameters:
- {
- int parameterCount=0;
- while(it != end)
- {
- result.command->parameters().push_back((*it));
- ++it;
- ++parameterCount;
- }
- }
- break;
+ bool is_channel_command = channel_index != -1;
+
+ // Create command instance
+ if (is_channel_command)
+ {
+ result.command = repo_->create_channel_command(result.command_name, client, channel_index, layer_index, tokens);
+
+ if (result.command)
+ {
+ result.lock = repo_->channels().at(channel_index).lock;
+ result.queue = commandQueues_.at(channel_index + 1);
+ }
+ else // Might be a non channel command, although the first argument is numeric
+ {
+ // Restore backed up channel spec string.
+ tokens.push_front(channel_spec);
+ result.command = repo_->create_command(result.command_name, client, tokens);
+
+ if (result.command)
+ result.queue = commandQueues_.at(0);
}
}
+ else
+ {
+ result.command = repo_->create_command(result.command_name, client, tokens);
+
+ if (result.command)
+ result.queue = commandQueues_.at(0);
+ }
+
+ if (!result.command)
+ result.error = error_state::command_error;
+ else
+ {
+ std::vector<std::wstring> parameters(tokens.begin(), tokens.end());
- if(result.command && result.error == no_error && result.command->parameters().size() < result.command->minimum_parameters()) {
- result.error = parameters_error;
+ result.command->parameters() = std::move(parameters);
+
+ if (result.command->parameters().size() < result.command->minimum_parameters())
+ result.error = error_state::parameters_error;
}
}
- catch(...)
+ catch (std::out_of_range&)
+ {
+ CASPAR_LOG(error) << "Invalid channel specified.";
+ result.error = error_state::channel_error;
+ }
+ catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
- result.error = unknown_error;
+ result.error = error_state::unknown_error;
}
- return result.error == no_error;
+ return result.error == error_state::no_error;
}
- std::size_t tokenize(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
+ template<typename C>
+ std::size_t tokenize(const std::wstring& message, C& pTokenVector)
{
//split on whitespace but keep strings within quotationmarks
//treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
//insert code-handling here
switch(message[charIndex])
{
- case TEXT('\\'):
- currentToken += TEXT("\\");
+ case L'\\':
+ currentToken += L"\\";
break;
- case TEXT('\"'):
- currentToken += TEXT("\"");
+ case L'\"':
+ currentToken += L"\"";
break;
- case TEXT('n'):
- currentToken += TEXT("\n");
+ case L'n':
+ currentToken += L"\n";
break;
default:
break;
continue;
}
- if(message[charIndex]==TEXT('\\'))
+ if(message[charIndex]==L'\\')
{
getSpecialCode = true;
continue;
}
- if(message[charIndex]==' ' && inQuote==false)
+ if(message[charIndex]==L' ' && inQuote==false)
{
- if(currentToken.size()>0)
+ if(!currentToken.empty())
{
- pTokenVector->push_back(currentToken);
+ pTokenVector.push_back(currentToken);
currentToken.clear();
}
continue;
}
- if(message[charIndex]==TEXT('\"'))
+ if(message[charIndex]==L'\"')
{
inQuote = !inQuote;
- if(currentToken.size()>0 || !inQuote)
+ if(!currentToken.empty() || !inQuote)
{
- pTokenVector->push_back(currentToken);
+ pTokenVector.push_back(currentToken);
currentToken.clear();
}
continue;
currentToken += message[charIndex];
}
- if(currentToken.size()>0)
+ if(!currentToken.empty())
{
- pTokenVector->push_back(currentToken);
+ pTokenVector.push_back(currentToken);
currentToken.clear();
}
- return pTokenVector->size();
- }
-
- AMCPCommand::ptr_type create_command(const std::wstring& str, ClientInfoPtr client)
- {
- std::wstring s = boost::to_upper_copy(str);
- if(s == TEXT("DIAG")) return std::make_shared<DiagnosticsCommand>(client);
- else if(s == TEXT("CHANNEL_GRID")) return std::make_shared<ChannelGridCommand>(client);
- else if(s == TEXT("DATA")) return std::make_shared<DataCommand>(client);
- else if(s == TEXT("CINF")) return std::make_shared<CinfCommand>(client);
- else if(s == TEXT("INFO")) return std::make_shared<InfoCommand>(client, channels_);
- else if(s == TEXT("CLS")) return std::make_shared<ClsCommand>(client);
- else if(s == TEXT("TLS")) return std::make_shared<TlsCommand>(client);
- else if(s == TEXT("VERSION")) return std::make_shared<VersionCommand>(client);
- else if(s == TEXT("BYE")) return std::make_shared<ByeCommand>(client);
- else if(s == TEXT("LOCK")) return std::make_shared<LockCommand>(client, channels_);
- else if(s == TEXT("LOG")) return std::make_shared<LogCommand>(client);
- //else if(s == TEXT("KILL"))
- //{
- // result = AMCPCommandPtr(new KillCommand());
- //}
-
- return nullptr;
- }
-
- AMCPCommand::ptr_type create_channel_command(const std::wstring& str, ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index)
- {
- std::wstring s = boost::to_upper_copy(str);
-
- if (s == TEXT("MIXER")) return std::make_shared<MixerCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("CALL")) return std::make_shared<CallCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("SWAP")) return std::make_shared<SwapCommand>(client, channel, channel_index, layer_index, channels_);
- else if(s == TEXT("LOAD")) return std::make_shared<LoadCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("LOADBG")) return std::make_shared<LoadbgCommand>(client, channel, channel_index, layer_index, channels_);
- else if(s == TEXT("ADD")) return std::make_shared<AddCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("REMOVE")) return std::make_shared<RemoveCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("PAUSE")) return std::make_shared<PauseCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("PLAY")) return std::make_shared<PlayCommand>(client, channel, channel_index, layer_index, channels_);
- else if(s == TEXT("STOP")) return std::make_shared<StopCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("CLEAR")) return std::make_shared<ClearCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("PRINT")) return std::make_shared<PrintCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("CG")) return std::make_shared<CGCommand>(client, channel, channel_index, layer_index);
- else if(s == TEXT("SET")) return std::make_shared<SetCommand>(client, channel, channel_index, layer_index);
-
- return nullptr;
+ return pTokenVector.size();
}
};
-
-AMCPProtocolStrategy::AMCPProtocolStrategy(const std::vector<spl::shared_ptr<core::video_channel>>& channels) : impl_(spl::make_unique<impl>(channels)) {}
+AMCPProtocolStrategy::AMCPProtocolStrategy(const std::wstring& name, const spl::shared_ptr<amcp_command_repository>& repo)
+ : impl_(spl::make_unique<impl>(name, repo))
+{
+}
AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
void AMCPProtocolStrategy::Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo) { impl_->Parse(msg, pClientInfo); }
} //namespace amcp
-}} //namespace caspar
\ No newline at end of file
+}} //namespace caspar