]> git.sesse.net Git - casparcg/blob - protocol/amcp/AMCPProtocolStrategy.cpp
Merged ffmpeg duration column and media_info_repository in CLS and CINF from master...
[casparcg] / protocol / amcp / AMCPProtocolStrategy.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Nicklas P Andersson
20 */
21
22  
23 #include "../StdAfx.h"
24
25 #include "AMCPProtocolStrategy.h"
26 #include "AMCPCommandsImpl.h"
27 #include "amcp_shared.h"
28 #include "AMCPCommand.h"
29 #include "AMCPCommandQueue.h"
30
31 #include <stdio.h>
32 //#include <crtdbg.h>
33 #include <string.h>
34 #include <algorithm>
35 #include <cctype>
36 #include <future>
37
38 #include <boost/algorithm/string/trim.hpp>
39 #include <boost/algorithm/string/split.hpp>
40 #include <boost/algorithm/string/replace.hpp>
41 #include <boost/lexical_cast.hpp>
42
43 #if defined(_MSC_VER)
44 #pragma warning (push, 1) // TODO: Legacy code, just disable warnings
45 #endif
46
47 namespace caspar { namespace protocol { namespace amcp {
48
49 using IO::ClientInfoPtr;
50
51 struct AMCPProtocolStrategy::impl
52 {
53 private:
54         std::vector<channel_context>                                    channels_;
55         std::vector<AMCPCommandQueue::ptr_type>                 commandQueues_;
56         std::shared_ptr<core::thumbnail_generator>              thumb_gen_;
57         spl::shared_ptr<core::media_info_repository>    media_info_repo_;
58         std::promise<bool>&                                                             shutdown_server_now_;
59
60 public:
61         impl(
62                         const std::vector<spl::shared_ptr<core::video_channel>>& channels,
63                         const std::shared_ptr<core::thumbnail_generator>& thumb_gen,
64                         const spl::shared_ptr<core::media_info_repository>& media_info_repo,
65                         std::promise<bool>& shutdown_server_now)
66                 : thumb_gen_(thumb_gen)
67                 , media_info_repo_(media_info_repo)
68                 , shutdown_server_now_(shutdown_server_now)
69         {
70                 commandQueues_.push_back(std::make_shared<AMCPCommandQueue>());
71
72                 int index = 0;
73                 for (const auto& channel : channels)
74                 {
75                         std::wstring lifecycle_key = L"lock" + boost::lexical_cast<std::wstring>(index);
76                         channels_.push_back(channel_context(channel, lifecycle_key));
77                         auto queue(std::make_shared<AMCPCommandQueue>());
78                         commandQueues_.push_back(queue);
79                         ++index;
80                 }
81         }
82
83         ~impl() {}
84
85         enum class parser_state {
86                 New = 0,
87                 GetSwitch,
88                 GetCommand,
89                 GetParameters
90         };
91         enum class error_state {
92                 no_error = 0,
93                 command_error,
94                 channel_error,
95                 parameters_error,
96                 unknown_error,
97                 access_error
98         };
99
100         struct command_interpreter_result
101         {
102                 command_interpreter_result() : error(error_state::no_error) {}
103
104                 std::shared_ptr<caspar::IO::lock_container>     lock;
105                 std::wstring                                                            command_name;
106                 AMCPCommand::ptr_type                                           command;
107                 error_state                                                                     error;
108                 AMCPCommandQueue::ptr_type                                      queue;
109         };
110
111         //The paser method expects message to be complete messages with the delimiter stripped away.
112         //Thesefore the AMCPProtocolStrategy should be decorated with a delimiter_based_chunking_strategy
113         void Parse(const std::wstring& message, ClientInfoPtr client)
114         {
115                 CASPAR_LOG(info) << L"Received message from " << client->print() << ": " << message << L"\\r\\n";
116         
117                 command_interpreter_result result;
118                 if(interpret_command_string(message, result, client))
119                 {
120                         if(result.lock && !result.lock->check_access(client))
121                                 result.error = error_state::access_error;
122                         else
123                                 result.queue->AddCommand(result.command);
124                 }
125                 
126                 if (result.error != error_state::no_error)
127                 {
128                         std::wstringstream answer;
129                         boost::to_upper(result.command_name);
130
131                         switch(result.error)
132                         {
133                         case error_state::command_error:
134                                 answer << L"400 ERROR\r\n" << message << "\r\n";
135                                 break;
136                         case error_state::channel_error:
137                                 answer << L"401 " << result.command_name << " ERROR\r\n";
138                                 break;
139                         case error_state::parameters_error:
140                                 answer << L"402 " << result.command_name << " ERROR\r\n";
141                                 break;
142                         case error_state::access_error:
143                                 answer << L"503 " << result.command_name << " FAILED\r\n";
144                                 break;
145                         default:
146                                 answer << L"500 FAILED\r\n";
147                                 break;
148                         }
149                         client->send(answer.str());
150                 }
151         }
152
153 private:
154         friend class AMCPCommand;
155
156         bool interpret_command_string(const std::wstring& message, command_interpreter_result& result, ClientInfoPtr client)
157         {
158                 try
159                 {
160                         std::vector<std::wstring> tokens;
161                         parser_state state = parser_state::New;
162
163                         tokenize(message, &tokens);
164
165                         //parse the message one token at the time
166                         auto end = tokens.end();
167                         auto it = tokens.begin();
168                         while (it != end && result.error == error_state::no_error)
169                         {
170                                 switch(state)
171                                 {
172                                 case parser_state::New:
173                                         if((*it)[0] == L'/')
174                                                 state = parser_state::GetSwitch;
175                                         else
176                                                 state = parser_state::GetCommand;
177                                         break;
178
179                                 case parser_state::GetSwitch:
180                                         //command_switch = (*it);       //we dont care for the switch anymore
181                                         state = parser_state::GetCommand;
182                                         ++it;
183                                         break;
184
185                                 case parser_state::GetCommand:
186                                         {
187                                                 result.command_name = (*it);
188                                                 result.command = create_command(result.command_name, client);
189                                                 if(result.command)      //the command doesn't need a channel
190                                                 {
191                                                         result.queue = commandQueues_[0];
192                                                         state = parser_state::GetParameters;
193                                                 }
194                                                 else
195                                                 {
196                                                         //get channel index from next token
197                                                         int channel_index = -1;
198                                                         int layer_index = -1;
199
200                                                         ++it;
201                                                         if(it == end)
202                                                         {
203                                                                 if(create_channel_command(result.command_name, client, channels_.at(0), 0, 0))  //check if there is a command like this
204                                                                         result.error = error_state::channel_error;
205                                                                 else
206                                                                         result.error = error_state::command_error;
207
208                                                                 break;
209                                                         }
210
211                                                         {       //parse channel/layer token
212                                                                 try
213                                                                 {
214                                                                         std::wstring channelid_str = boost::trim_copy(*it);
215                                                                         std::vector<std::wstring> split;
216                                                                         boost::split(split, channelid_str, boost::is_any_of("-"));
217
218                                                                         channel_index = boost::lexical_cast<int>(split[0]) - 1;
219                                                                         if(split.size() > 1)
220                                                                                 layer_index = boost::lexical_cast<int>(split[1]);
221                                                                 }
222                                                                 catch(...)
223                                                                 {
224                                                                         result.error = error_state::channel_error;
225                                                                         break;
226                                                                 }
227                                                         }
228                                                 
229                                                         if(channel_index >= 0 && channel_index < channels_.size())
230                                                         {
231                                                                 result.command = create_channel_command(result.command_name, client, channels_.at(channel_index), channel_index, layer_index);
232                                                                 if(result.command)
233                                                                 {
234                                                                         result.lock = channels_.at(channel_index).lock;
235                                                                         result.queue = commandQueues_[channel_index + 1];
236                                                                 }
237                                                                 else
238                                                                 {
239                                                                         result.error = error_state::command_error;
240                                                                         break;
241                                                                 }
242                                                         }
243                                                         else
244                                                         {
245                                                                 result.error = error_state::channel_error;
246                                                                 break;
247                                                         }
248                                                 }
249
250                                                 state = parser_state::GetParameters;
251                                                 ++it;
252                                         }
253                                         break;
254
255                                 case parser_state::GetParameters:
256                                         {
257                                                 int parameterCount=0;
258                                                 while(it != end)
259                                                 {
260                                                         result.command->parameters().push_back((*it));
261                                                         ++it;
262                                                         ++parameterCount;
263                                                 }
264                                         }
265                                         break;
266                                 }
267                         }
268
269                         if(result.command && result.error == error_state::no_error && result.command->parameters().size() < result.command->minimum_parameters()) {
270                                 result.error = error_state::parameters_error;
271                         }
272                 }
273                 catch(...)
274                 {
275                         CASPAR_LOG_CURRENT_EXCEPTION();
276                         result.error = error_state::unknown_error;
277                 }
278
279                 return result.error == error_state::no_error;
280         }
281
282         std::size_t tokenize(const std::wstring& message, std::vector<std::wstring>* pTokenVector)
283         {
284                 //split on whitespace but keep strings within quotationmarks
285                 //treat \ as the start of an escape-sequence: the following char will indicate what to actually put in the string
286
287                 std::wstring currentToken;
288
289                 bool inQuote = false;
290                 bool getSpecialCode = false;
291
292                 for(unsigned int charIndex=0; charIndex<message.size(); ++charIndex)
293                 {
294                         if(getSpecialCode)
295                         {
296                                 //insert code-handling here
297                                 switch(message[charIndex])
298                                 {
299                                 case L'\\':
300                                         currentToken += L"\\";
301                                         break;
302                                 case L'\"':
303                                         currentToken += L"\"";
304                                         break;
305                                 case L'n':
306                                         currentToken += L"\n";
307                                         break;
308                                 default:
309                                         break;
310                                 };
311                                 getSpecialCode = false;
312                                 continue;
313                         }
314
315                         if(message[charIndex]==L'\\')
316                         {
317                                 getSpecialCode = true;
318                                 continue;
319                         }
320
321                         if(message[charIndex]==L' ' && inQuote==false)
322                         {
323                                 if(currentToken.size()>0)
324                                 {
325                                         pTokenVector->push_back(currentToken);
326                                         currentToken.clear();
327                                 }
328                                 continue;
329                         }
330
331                         if(message[charIndex]==L'\"')
332                         {
333                                 inQuote = !inQuote;
334
335                                 if(currentToken.size()>0 || !inQuote)
336                                 {
337                                         pTokenVector->push_back(currentToken);
338                                         currentToken.clear();
339                                 }
340                                 continue;
341                         }
342
343                         currentToken += message[charIndex];
344                 }
345
346                 if(currentToken.size()>0)
347                 {
348                         pTokenVector->push_back(currentToken);
349                         currentToken.clear();
350                 }
351
352                 return pTokenVector->size();
353         }
354
355         AMCPCommand::ptr_type create_command(const std::wstring& str, ClientInfoPtr client)
356         {
357                 std::wstring s = boost::to_upper_copy(str);
358                 if(s == L"DIAG")                                return std::make_shared<DiagnosticsCommand>(client);
359                 else if(s == L"CHANNEL_GRID")   return std::make_shared<ChannelGridCommand>(client, channels_);
360                 else if(s == L"DATA")                   return std::make_shared<DataCommand>(client);
361                 else if(s == L"CINF")                   return std::make_shared<CinfCommand>(client, media_info_repo_);
362                 else if(s == L"INFO")                   return std::make_shared<InfoCommand>(client, channels_);
363                 else if(s == L"CLS")                    return std::make_shared<ClsCommand>(client, media_info_repo_);
364                 else if(s == L"TLS")                    return std::make_shared<TlsCommand>(client);
365                 else if(s == L"VERSION")                return std::make_shared<VersionCommand>(client);
366                 else if(s == L"BYE")                    return std::make_shared<ByeCommand>(client);
367                 else if(s == L"LOCK")                   return std::make_shared<LockCommand>(client, channels_);
368                 else if(s == L"LOG")                    return std::make_shared<LogCommand>(client);
369                 else if(s == L"THUMBNAIL")              return std::make_shared<ThumbnailCommand>(client, thumb_gen_);
370                 else if(s == L"KILL")                   return std::make_shared<KillCommand>(client, shutdown_server_now_);
371                 else if(s == L"RESTART")                return std::make_shared<RestartCommand>(client, shutdown_server_now_);
372
373                 return nullptr;
374         }
375
376         AMCPCommand::ptr_type create_channel_command(const std::wstring& str, ClientInfoPtr client, const channel_context& channel, unsigned int channel_index, int layer_index)
377         {
378                 std::wstring s = boost::to_upper_copy(str);
379         
380                 if         (s == L"MIXER")                      return std::make_shared<MixerCommand>(client, channel, channel_index, layer_index);
381                 else if(s == L"CALL")                   return std::make_shared<CallCommand>(client, channel, channel_index, layer_index);
382                 else if(s == L"SWAP")                   return std::make_shared<SwapCommand>(client, channel, channel_index, layer_index, channels_);
383                 else if(s == L"LOAD")                   return std::make_shared<LoadCommand>(client, channel, channel_index, layer_index);
384                 else if(s == L"LOADBG")                 return std::make_shared<LoadbgCommand>(client, channel, channel_index, layer_index, channels_);
385                 else if(s == L"ADD")                    return std::make_shared<AddCommand>(client, channel, channel_index, layer_index);
386                 else if(s == L"REMOVE")                 return std::make_shared<RemoveCommand>(client, channel, channel_index, layer_index);
387                 else if(s == L"PAUSE")                  return std::make_shared<PauseCommand>(client, channel, channel_index, layer_index);
388                 else if(s == L"PLAY")                   return std::make_shared<PlayCommand>(client, channel, channel_index, layer_index, channels_);
389                 else if(s == L"STOP")                   return std::make_shared<StopCommand>(client, channel, channel_index, layer_index);
390                 else if(s == L"CLEAR")                  return std::make_shared<ClearCommand>(client, channel, channel_index, layer_index);
391                 else if(s == L"PRINT")                  return std::make_shared<PrintCommand>(client, channel, channel_index, layer_index);
392                 else if(s == L"CG")                             return std::make_shared<CGCommand>(client, channel, channel_index, layer_index);
393                 else if(s == L"SET")                    return std::make_shared<SetCommand>(client, channel, channel_index, layer_index);
394
395                 return nullptr;
396         }
397 };
398
399
400 AMCPProtocolStrategy::AMCPProtocolStrategy(
401                 const std::vector<spl::shared_ptr<core::video_channel>>& channels,
402                 const std::shared_ptr<core::thumbnail_generator>& thumb_gen,
403                 const spl::shared_ptr<core::media_info_repository>& media_info_repo,
404                 std::promise<bool>& shutdown_server_now)
405         : impl_(spl::make_unique<impl>(channels, thumb_gen, media_info_repo, shutdown_server_now))
406 {
407 }
408 AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
409 void AMCPProtocolStrategy::Parse(const std::wstring& msg, IO::ClientInfoPtr pClientInfo) { impl_->Parse(msg, pClientInfo); }
410
411
412 }       //namespace amcp
413 }}      //namespace caspar