]> git.sesse.net Git - casparcg/blob - protocol/amcp/AMCPCommandQueue.cpp
24a3f8af0a3917e312fad8703f201e64519fb43a
[casparcg] / protocol / amcp / AMCPCommandQueue.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Nicklas P Andersson
20 */
21
22 #include "../StdAfx.h"
23
24 #include "AMCPCommandQueue.h"
25
26 #include <boost/property_tree/ptree.hpp>
27
28 #include <cmath>
29
30 namespace caspar { namespace protocol { namespace amcp {
31
32 namespace {
33
34 tbb::spin_mutex& get_global_mutex()
35 {
36         static tbb::spin_mutex mutex;
37
38         return mutex;
39 }
40
41 std::map<std::wstring, AMCPCommandQueue*>& get_instances()
42 {
43         static std::map<std::wstring, AMCPCommandQueue*> queues;
44
45         return queues;
46 }
47
48 }
49
50 AMCPCommandQueue::AMCPCommandQueue(const std::wstring& name)
51         : executor_(L"AMCPCommandQueue " + name)
52 {
53         tbb::spin_mutex::scoped_lock lock(get_global_mutex());
54
55         get_instances().insert(std::make_pair(name, this));
56 }
57
58 AMCPCommandQueue::~AMCPCommandQueue()
59 {
60         tbb::spin_mutex::scoped_lock lock(get_global_mutex());
61
62         get_instances().erase(executor_.name());
63 }
64
65 void AMCPCommandQueue::AddCommand(AMCPCommand::ptr_type pCurrentCommand)
66 {
67         if(!pCurrentCommand)
68                 return;
69         
70         if(executor_.size() > 128)
71         {
72                 try
73                 {
74                         CASPAR_LOG(error) << "AMCP Command Queue Overflow.";
75                         CASPAR_LOG(error) << "Failed to execute command:" << pCurrentCommand->print();
76                         pCurrentCommand->SetReplyString(L"500 FAILED\r\n");
77                         pCurrentCommand->SendReply();
78                 }
79                 catch(...)
80                 {
81                         CASPAR_LOG_CURRENT_EXCEPTION();
82                 }
83         }
84         
85         executor_.begin_invoke([=]
86         {
87                 try
88                 {
89                         try
90                         {
91                                 caspar::timer timer;
92
93                                 auto print = pCurrentCommand->print();
94                                 auto params = boost::join(pCurrentCommand->parameters(), L" ");
95
96                                 {
97                                         tbb::spin_mutex::scoped_lock lock(running_command_mutex_);
98                                         running_command_ = true;
99                                         running_command_name_ = print;
100                                         running_command_params_ = std::move(params);
101                                         running_command_since_.restart();
102                                 }
103
104                                 if (pCurrentCommand->Execute())
105                                         CASPAR_LOG(debug) << "Executed command (" << timer.elapsed() << "s): " << print;
106                                 else
107                                         CASPAR_LOG(warning) << "Failed to execute command: " << print;
108                         }
109                         catch (file_not_found&)
110                         {
111                                 CASPAR_LOG(error) << L"File not found. No match found for parameters. Check syntax.";
112                                 pCurrentCommand->SetReplyString(L"404 " + pCurrentCommand->print() + L" FAILED\r\n");
113                         }
114                         catch (const user_error& e)
115                         {
116                                 CASPAR_LOG(error) << *boost::get_error_info<msg_info_t>(e) << ". Check syntax.";
117                                 pCurrentCommand->SetReplyString(L"403 " + pCurrentCommand->print() + L" FAILED\r\n");
118                         }
119                         catch (std::out_of_range&)
120                         {
121                                 CASPAR_LOG(error) << L"Missing parameter. Check syntax.";
122                                 pCurrentCommand->SetReplyString(L"402 " + pCurrentCommand->print() + L" FAILED\r\n");
123                         }
124                         catch (...)
125                         {
126                                 CASPAR_LOG_CURRENT_EXCEPTION();
127                                 CASPAR_LOG(warning) << "Failed to execute command:" << pCurrentCommand->print();
128                                 pCurrentCommand->SetReplyString(L"501 " + pCurrentCommand->print() + L" FAILED\r\n");
129                         }
130                                 
131                         pCurrentCommand->SendReply();
132                         
133                         CASPAR_LOG(trace) << "Ready for a new command";
134
135                         tbb::spin_mutex::scoped_lock lock(running_command_mutex_);
136                         running_command_ = false;
137                 }
138                 catch(...)
139                 {
140                         CASPAR_LOG_CURRENT_EXCEPTION();
141                 }
142         });
143 }
144
145 boost::property_tree::wptree AMCPCommandQueue::info() const
146 {
147         boost::property_tree::wptree info;
148
149         auto name = executor_.name();
150         info.add(L"name", name);
151         auto size = executor_.size();
152         info.add(L"queued", std::max(0u, size));
153
154         bool running_command;
155         std::wstring running_command_name;
156         std::wstring running_command_params;
157         int64_t running_command_elapsed;
158
159         lock(running_command_mutex_, [&]
160         {
161                 running_command = running_command_;
162
163                 if (running_command)
164                 {
165                         running_command_name = running_command_name_;
166                         running_command_params = running_command_params_;
167                         running_command_elapsed = static_cast<int64_t>(
168                                 running_command_since_.elapsed() * 1000.0);
169                 }
170         });
171
172         if (running_command)
173         {
174                 info.add(L"running.command", running_command_name);
175                 info.add(L"running.params", running_command_params);
176                 info.add(L"running.elapsed", running_command_elapsed);
177         }
178
179         return info;
180 }
181
182 boost::property_tree::wptree AMCPCommandQueue::info_all_queues()
183 {
184         boost::property_tree::wptree info;
185         tbb::spin_mutex::scoped_lock lock(get_global_mutex());
186
187         for (auto& queue : get_instances())
188         {
189                 info.add_child(L"queues.queue", queue.second->info());
190         }
191
192         return info;
193 }
194
195 }}}