function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); }
bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); }
bool is_running() const /*noexcept*/ { return is_running_; }
+ const std::string& name() const { return name_; }
private:
_parameters = p;\r
}\r
\r
+ const core::parameters& GetParameters() const { return _parameters; }\r
+\r
void SetClientInfo(IO::ClientInfoPtr& s){pClientInfo_ = s;}\r
IO::ClientInfoPtr GetClientInfo(){return pClientInfo_;}\r
\r
\r
#include "AMCPCommandQueue.h"\r
\r
+#include <boost/property_tree/ptree.hpp>\r
+\r
+#include <common/concurrency/lock.h>\r
+\r
namespace caspar { namespace protocol { namespace amcp {\r
+\r
+namespace {\r
+\r
+tbb::spin_mutex& get_global_mutex()\r
+{\r
+ static tbb::spin_mutex mutex;\r
+\r
+ return mutex;\r
+}\r
+\r
+std::map<std::wstring, AMCPCommandQueue*>& get_instances()\r
+{\r
+ static std::map<std::wstring, AMCPCommandQueue*> queues;\r
+\r
+ return queues;\r
+}\r
+\r
+}\r
\r
-AMCPCommandQueue::AMCPCommandQueue() \r
- : executor_(L"AMCPCommandQueue")\r
+AMCPCommandQueue::AMCPCommandQueue(const std::wstring& name)\r
+ : executor_(L"AMCPCommandQueue " + name)\r
+ , running_command_(false)\r
{\r
+ tbb::spin_mutex::scoped_lock lock(get_global_mutex());\r
+\r
+ get_instances().insert(std::make_pair(name, this));\r
}\r
\r
AMCPCommandQueue::~AMCPCommandQueue() \r
{\r
+ tbb::spin_mutex::scoped_lock lock(get_global_mutex());\r
+\r
+ get_instances().erase(widen(executor_.name()));\r
}\r
\r
void AMCPCommandQueue::AddCommand(AMCPCommandPtr pCurrentCommand)\r
{\r
try\r
{\r
+ auto print = pCurrentCommand->print();\r
+ auto params = pCurrentCommand->GetParameters().get_original_string();\r
+\r
+ {\r
+ tbb::spin_mutex::scoped_lock lock(running_command_mutex_);\r
+ running_command_ = true;\r
+ running_command_name_ = print;\r
+ running_command_params_ = std::move(params);\r
+ running_command_since_.restart();\r
+ }\r
+\r
if(pCurrentCommand->Execute()) \r
- CASPAR_LOG(debug) << "Executed command: " << pCurrentCommand->print();\r
+ CASPAR_LOG(debug) << "Executed command: " << print << L" on " << widen(executor_.name());\r
else \r
- CASPAR_LOG(warning) << "Failed to execute command: " << pCurrentCommand->print();\r
+ CASPAR_LOG(warning) << "Failed to execute command: " << print << L" on " << widen(executor_.name());\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(error) << "Failed to execute command:" << pCurrentCommand->print();\r
+ CASPAR_LOG(error) << "Failed to execute command:" << pCurrentCommand->print() << L" on " << widen(executor_.name());\r
pCurrentCommand->SetReplyString(L"500 FAILED\r\n");\r
}\r
\r
pCurrentCommand->SendReply();\r
\r
CASPAR_LOG(trace) << "Ready for a new command";\r
+\r
+ tbb::spin_mutex::scoped_lock lock(running_command_mutex_);\r
+ running_command_ = false;\r
}\r
catch(...)\r
{\r
});\r
}\r
\r
+boost::property_tree::wptree AMCPCommandQueue::info() const\r
+{\r
+ boost::property_tree::wptree info;\r
+\r
+ auto name = widen(executor_.name());\r
+ info.add(L"name", name);\r
+ auto size = executor_.size();\r
+ info.add(L"queued", std::max(0, size));\r
+\r
+ bool running_command;\r
+ std::wstring running_command_name;\r
+ std::wstring running_command_params;\r
+ int64_t running_command_elapsed;\r
+\r
+ lock(running_command_mutex_, [&]\r
+ {\r
+ running_command = running_command_;\r
+ \r
+ if (running_command)\r
+ {\r
+ running_command_name = running_command_name_;\r
+ running_command_params = running_command_params_;\r
+ running_command_elapsed = static_cast<int64_t>(\r
+ running_command_since_.elapsed() * 1000.0);\r
+ }\r
+ });\r
+\r
+ if (running_command)\r
+ {\r
+ info.add(L"running.command", running_command_name);\r
+ info.add(L"running.params", running_command_params);\r
+ info.add(L"running.elapsed", running_command_elapsed);\r
+ }\r
+\r
+ return info;\r
+}\r
+\r
+boost::property_tree::wptree AMCPCommandQueue::info_all_queues()\r
+{\r
+ boost::property_tree::wptree info;\r
+ tbb::spin_mutex::scoped_lock lock(get_global_mutex());\r
+\r
+ BOOST_FOREACH(auto& queue, get_instances())\r
+ {\r
+ info.add_child(L"queues.queue", queue.second->info());\r
+ }\r
+\r
+ return info;\r
+}\r
+\r
}}}
\ No newline at end of file
\r
#include <common/concurrency/executor.h>\r
\r
-#include <tbb\mutex.h>\r
+#include <boost/property_tree/ptree_fwd.hpp>\r
+#include <boost/timer.hpp>\r
+\r
+#include <tbb/spin_mutex.h>\r
\r
namespace caspar { namespace protocol { namespace amcp {\r
\r
AMCPCommandQueue(const AMCPCommandQueue&);\r
AMCPCommandQueue& operator=(const AMCPCommandQueue&);\r
public:\r
- AMCPCommandQueue();\r
+ AMCPCommandQueue(const std::wstring& name);\r
~AMCPCommandQueue();\r
\r
void AddCommand(AMCPCommandPtr pCommand);\r
\r
+ boost::property_tree::wptree info() const;\r
+\r
+ static boost::property_tree::wptree info_all_queues();\r
private:\r
- executor executor_;\r
+ executor executor_;\r
+ mutable tbb::spin_mutex running_command_mutex_;\r
+ bool running_command_;\r
+ std::wstring running_command_name_;\r
+ std::wstring running_command_params_;\r
+ boost::timer running_command_since_;\r
};\r
typedef std::tr1::shared_ptr<AMCPCommandQueue> AMCPCommandQueuePtr;\r
\r
\r
boost::property_tree::write_xml(replyString, info, w);\r
}\r
+ else if(_parameters.size() >= 1 && _parameters[0] == L"QUEUES")\r
+ {\r
+ replyString << L"201 INFO QUEUES OK\r\n";\r
+\r
+ boost::property_tree::wptree info = AMCPCommandQueue::info_all_queues();\r
+ boost::property_tree::write_xml(replyString, info, w);\r
+ }\r
else if(_parameters.size() >= 1 && _parameters[0] == L"SYSTEM")\r
{\r
replyString << L"201 INFO SYSTEM OK\r\n";\r
}\r
\r
AMCPProtocolStrategy::AMCPProtocolStrategy(\r
+ const std::wstring& name,\r
const std::vector<safe_ptr<core::video_channel>>& channels,\r
const std::shared_ptr<core::thumbnail_generator>& thumb_gen,\r
const safe_ptr<core::media_info_repository>& media_info_repo,\r
, ogl_(ogl_device)\r
, shutdown_server_now_(shutdown_server_now)\r
{\r
- AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue());\r
+ AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue(L"General Queue for " + name));\r
commandQueues_.push_back(pGeneralCommandQueue);\r
\r
\r
unsigned int index = -1;\r
//Create a commandpump for each video_channel\r
while((pChannel = GetChannelSafe(++index, channels_)) != 0) {\r
- AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue());\r
+ AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue(L"Channel " + boost::lexical_cast<std::wstring>(index + 1) + L" for " + name));\r
std::wstring title = TEXT("video_channel ");\r
\r
//HACK: Perform real conversion from int to string\r
\r
public:\r
AMCPProtocolStrategy(\r
+ const std::wstring& name,\r
const std::vector<safe_ptr<core::video_channel>>& channels,\r
const std::shared_ptr<core::thumbnail_generator>& thumb_gen,\r
const safe_ptr<core::media_info_repository>& media_info_repo,\r
\r
// Create a amcp parser for console commands.\r
caspar::protocol::amcp::AMCPProtocolStrategy amcp(\r
+ L"Console",\r
caspar_server.get_channels(),\r
caspar_server.get_thumbnail_generator(),\r
caspar_server.get_media_info_repo(),\r
if(name == L"tcp")\r
{ \r
unsigned int port = xml_controller.second.get(L"port", 5250);\r
- auto asyncbootstrapper = make_safe<IO::AsyncEventServer>(create_protocol(protocol), port);\r
+ auto asyncbootstrapper = make_safe<IO::AsyncEventServer>(create_protocol(\r
+ protocol,\r
+ L"TCP Port " + boost::lexical_cast<std::wstring>(port)),\r
+ port);\r
asyncbootstrapper->Start();\r
async_servers_.push_back(asyncbootstrapper);\r
\r
CASPAR_LOG(info) << L"Initialized thumbnail generator.";\r
}\r
\r
- safe_ptr<IO::IProtocolStrategy> create_protocol(const std::wstring& name) const\r
+ safe_ptr<IO::IProtocolStrategy> create_protocol(const std::wstring& name, const std::wstring& port_description) const\r
{\r
if(boost::iequals(name, L"AMCP"))\r
- return make_safe<amcp::AMCPProtocolStrategy>(channels_, thumbnail_generator_, media_info_repo_, ogl_, shutdown_server_now_);\r
+ return make_safe<amcp::AMCPProtocolStrategy>(\r
+ port_description,\r
+ channels_,\r
+ thumbnail_generator_,\r
+ media_info_repo_,\r
+ ogl_,\r
+ shutdown_server_now_);\r
else if(boost::iequals(name, L"CII"))\r
return make_safe<cii::CIIProtocolStrategy>(channels_);\r
else if(boost::iequals(name, L"CLOCK"))\r