]> git.sesse.net Git - casparcg/commitdiff
Merged INFO QUEUES from 2.0
authorHelge Norberg <helge.norberg@svt.se>
Wed, 12 Aug 2015 11:37:04 +0000 (13:37 +0200)
committerHelge Norberg <helge.norberg@svt.se>
Wed, 12 Aug 2015 11:37:04 +0000 (13:37 +0200)
common/executor.h
protocol/amcp/AMCPCommandQueue.cpp
protocol/amcp/AMCPCommandQueue.h
protocol/amcp/AMCPCommandsImpl.cpp
protocol/amcp/AMCPProtocolStrategy.cpp
protocol/amcp/AMCPProtocolStrategy.h
shell/main.cpp
shell/server.cpp

index d812c98e1f806b83164e389fc0eff419ab56daf4..59e12ba905e446e3caab34a4d84a771644ef1121 100644 (file)
@@ -176,6 +176,11 @@ public:
        {
                return boost::this_thread::get_id() == thread_.get_id();
        }
+
+       std::wstring name() const
+       {
+               return name_;
+       }
                
 private:       
 
index 8b986f2b2bfd1191d57357d4e93ee70b3dd127fa..4263a525e4606fa96f8a29d1dddc6ea0dec027c6 100644 (file)
 
 #include "AMCPCommandQueue.h"
 
-#include <common/timer.h>
+#include <boost/property_tree/ptree.hpp>
+
+#include <cmath>
 
 namespace caspar { namespace protocol { namespace amcp {
-       
-AMCPCommandQueue::AMCPCommandQueue() 
-       : executor_(L"AMCPCommandQueue")
+
+namespace {
+
+tbb::spin_mutex& get_global_mutex()
 {
+       static tbb::spin_mutex mutex;
+
+       return mutex;
 }
 
-AMCPCommandQueue::~AMCPCommandQueue() 
+std::map<std::wstring, AMCPCommandQueue*>& get_instances()
 {
+       static std::map<std::wstring, AMCPCommandQueue*> queues;
+
+       return queues;
+}
+
+}
+
+AMCPCommandQueue::AMCPCommandQueue(const std::wstring& name)
+       : executor_(L"AMCPCommandQueue " + name)
+{
+       tbb::spin_mutex::scoped_lock lock(get_global_mutex());
+
+       get_instances().insert(std::make_pair(name, this));
+}
+
+AMCPCommandQueue::~AMCPCommandQueue()
+{
+       tbb::spin_mutex::scoped_lock lock(get_global_mutex());
+
+       get_instances().erase(executor_.name());
 }
 
 void AMCPCommandQueue::AddCommand(AMCPCommand::ptr_type pCurrentCommand)
@@ -64,10 +90,21 @@ void AMCPCommandQueue::AddCommand(AMCPCommand::ptr_type pCurrentCommand)
 
                        try
                        {
-                               if(pCurrentCommand->Execute()) 
-                                       CASPAR_LOG(debug) << "Executed command: " << pCurrentCommand->print() << " " << timer.elapsed();
-                               else 
-                                       CASPAR_LOG(warning) << "Failed to execute command: " << pCurrentCommand->print() << " " << timer.elapsed();
+                               auto print = pCurrentCommand->print();
+                               auto params = boost::join(pCurrentCommand->parameters(), L" ");
+
+                               {
+                                       tbb::spin_mutex::scoped_lock lock(running_command_mutex_);
+                                       running_command_ = true;
+                                       running_command_name_ = print;
+                                       running_command_params_ = std::move(params);
+                                       running_command_since_.restart();
+                               }
+
+                               if (pCurrentCommand->Execute())
+                                       CASPAR_LOG(debug) << "Executed command: " << print << " " << timer.elapsed();
+                               else
+                                       CASPAR_LOG(warning) << "Failed to execute command: " << print << " " << timer.elapsed();
                        }
                        catch (file_not_found&)
                        {
@@ -89,6 +126,9 @@ void AMCPCommandQueue::AddCommand(AMCPCommand::ptr_type pCurrentCommand)
                        pCurrentCommand->SendReply();
                        
                        CASPAR_LOG(trace) << "Ready for a new command";
+
+                       tbb::spin_mutex::scoped_lock lock(running_command_mutex_);
+                       running_command_ = false;
                }
                catch(...)
                {
@@ -97,4 +137,54 @@ void AMCPCommandQueue::AddCommand(AMCPCommand::ptr_type pCurrentCommand)
        });
 }
 
+boost::property_tree::wptree AMCPCommandQueue::info() const
+{
+       boost::property_tree::wptree info;
+
+       auto name = executor_.name();
+       info.add(L"name", name);
+       auto size = executor_.size();
+       info.add(L"queued", std::max(0u, size));
+
+       bool running_command;
+       std::wstring running_command_name;
+       std::wstring running_command_params;
+       int64_t running_command_elapsed;
+
+       lock(running_command_mutex_, [&]
+       {
+               running_command = running_command_;
+
+               if (running_command)
+               {
+                       running_command_name = running_command_name_;
+                       running_command_params = running_command_params_;
+                       running_command_elapsed = static_cast<int64_t>(
+                               running_command_since_.elapsed() * 1000.0);
+               }
+       });
+
+       if (running_command)
+       {
+               info.add(L"running.command", running_command_name);
+               info.add(L"running.params", running_command_params);
+               info.add(L"running.elapsed", running_command_elapsed);
+       }
+
+       return info;
+}
+
+boost::property_tree::wptree AMCPCommandQueue::info_all_queues()
+{
+       boost::property_tree::wptree info;
+       tbb::spin_mutex::scoped_lock lock(get_global_mutex());
+
+       for (auto& queue : get_instances())
+       {
+               info.add_child(L"queues.queue", queue.second->info());
+       }
+
+       return info;
+}
+
 }}}
index f1fb2c63c522f089d54992ae8e6fed41c508b472..ff78f3e04c05f71511624a605897cd8699886a41 100644 (file)
@@ -25,6 +25,9 @@
 
 #include <common/executor.h>
 #include <common/memory.h>
+#include <common/timer.h>
+
+#include <boost/property_tree/ptree_fwd.hpp>
 
 #include <tbb/mutex.h>
 
@@ -37,13 +40,21 @@ class AMCPCommandQueue
 public:
        typedef spl::shared_ptr<AMCPCommandQueue> ptr_type;
 
-       AMCPCommandQueue();
+       AMCPCommandQueue(const std::wstring& name);
        ~AMCPCommandQueue();
 
        void AddCommand(AMCPCommand::ptr_type pCommand);
 
+       boost::property_tree::wptree info() const;
+
+       static boost::property_tree::wptree info_all_queues();
 private:
-       executor                        executor_;
+       executor                                executor_;
+       mutable tbb::spin_mutex running_command_mutex_;
+       bool                                    running_command_                = false;
+       std::wstring                    running_command_name_;
+       std::wstring                    running_command_params_;
+       caspar::timer                   running_command_since_;
 };
 
 }}}
index 0e58ac7454cd0dec5ed9b54523cee25e62df898a..bc82652899272527ba03b32759d73244e909d331 100644 (file)
@@ -28,6 +28,7 @@
 #include "AMCPCommandsImpl.h"
 
 #include "amcp_command_repository.h"
+#include "AMCPCommandQueue.h"
 
 #include <common/env.h>
 
@@ -2388,6 +2389,18 @@ std::wstring info_server_command(command_context& ctx)
        return create_info_xml_reply(info, L"SERVER");
 }
 
+void info_queues_describer(core::help_sink& sink, const core::help_repository& repo)
+{
+       sink.short_description(L"Get detailed information about all AMCP Command Queues.");
+       sink.syntax(L"INFO QUEUES");
+       sink.para()->text(L"Gets detailed information about all AMCP Command Queues.");
+}
+
+std::wstring info_queues_command(command_context& ctx)
+{
+       return create_info_xml_reply(AMCPCommandQueue::info_all_queues(), L"QUEUES");
+}
+
 void diag_describer(core::help_sink& sink, const core::help_repository& repo)
 {
        sink.short_description(L"Open the diagnostics window.");
@@ -2751,6 +2764,7 @@ void register_commands(amcp_command_repository& repo)
        repo.register_command(                  L"Query Commands",              L"INFO PATHS",                          info_paths_describer,                           info_paths_command,                             0);
        repo.register_command(                  L"Query Commands",              L"INFO SYSTEM",                         info_system_describer,                          info_system_command,                    0);
        repo.register_command(                  L"Query Commands",              L"INFO SERVER",                         info_server_describer,                          info_server_command,                    0);
+       repo.register_command(                  L"Query Commands",              L"INFO QUEUES",                         info_queues_describer,                          info_queues_command,                    0);
        repo.register_command(                  L"Query Commands",              L"DIAG",                                        diag_describer,                                         diag_command,                                   0);
        repo.register_command(                  L"Query Commands",              L"BYE",                                         bye_describer,                                          bye_command,                                    0);
        repo.register_command(                  L"Query Commands",              L"KILL",                                        kill_describer,                                         kill_command,                                   0);
index 5db0c6e09b06b807c3d993286f67d79d229d6df7..f22827bd35c0a8729b466e1798674fd382f5a3f5 100644 (file)
@@ -69,10 +69,16 @@ private:
        spl::shared_ptr<amcp_command_repository>        repo_;
 
 public:
-       impl(const spl::shared_ptr<amcp_command_repository>& repo)
+       impl(const std::wstring& name, const spl::shared_ptr<amcp_command_repository>& repo)
                : repo_(repo)
        {
-               commandQueues_.resize(repo_->channels().size() + 1);
+               commandQueues_.push_back(spl::make_shared<AMCPCommandQueue>(L"General Queue for " + name));
+
+               for (int i = 0; i < repo_->channels().size(); ++i)
+               {
+                       commandQueues_.push_back(spl::make_shared<AMCPCommandQueue>(
+                                       L"Channel " + boost::lexical_cast<std::wstring>(i + 1) + L" for " + name));
+               }
        }
 
        ~impl() {}
@@ -92,7 +98,7 @@ public:
                std::wstring                                                            command_name;
                AMCPCommand::ptr_type                                           command;
                error_state                                                                     error                   = error_state::no_error;
-               AMCPCommandQueue::ptr_type                                      queue;
+               std::shared_ptr<AMCPCommandQueue>                       queue;
        };
 
        //The paser method expects message to be complete messages with the delimiter stripped away.
@@ -310,8 +316,8 @@ private:
        }
 };
 
-AMCPProtocolStrategy::AMCPProtocolStrategy(const spl::shared_ptr<amcp_command_repository>& repo)
-       : impl_(spl::make_unique<impl>(repo))
+AMCPProtocolStrategy::AMCPProtocolStrategy(const std::wstring& name, const spl::shared_ptr<amcp_command_repository>& repo)
+       : impl_(spl::make_unique<impl>(name, repo))
 {
 }
 AMCPProtocolStrategy::~AMCPProtocolStrategy() {}
index 3e4a991b27d1e21b9d22fddc4667c3fa9c6f69d5..559b9d44f1f09042135033c1a5dbd8b8003e6fb1 100644 (file)
@@ -41,7 +41,7 @@ namespace caspar { namespace protocol { namespace amcp {
 class AMCPProtocolStrategy : public IO::IProtocolStrategy, boost::noncopyable
 {
 public:
-       AMCPProtocolStrategy(const spl::shared_ptr<class amcp_command_repository>& repo);
+       AMCPProtocolStrategy(const std::wstring& name, const spl::shared_ptr<class amcp_command_repository>& repo);
 
        virtual ~AMCPProtocolStrategy();
 
index 0fa28cdcbdf42e3d3083b9493172db71f170e8c4..15847906c189a2c399faab736db7f05c62aa5593 100644 (file)
@@ -236,6 +236,7 @@ bool run()
                        L"\r\n",
                        spl::make_shared<caspar::IO::legacy_strategy_adapter_factory>(
                                        spl::make_shared<protocol::amcp::AMCPProtocolStrategy>(
+                                                       L"Console",
                                                        caspar_server.get_amcp_command_repository())))->create(console_client);
 
        // Use separate thread for the blocking console input, will be terminated 
index 04897d48e6ba8cdda14422b7c3155f8ab238ac57..2eb3f5b0c3c8cef9be3e66c937cebf81b7cecaf1 100644 (file)
@@ -297,7 +297,9 @@ struct server::impl : boost::noncopyable
                                if(name == L"tcp")
                                {                                       
                                        unsigned int port = xml_controller.second.get(L"port", 5250);
-                                       auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(create_protocol(protocol), port);
+                                       auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(
+                                                       create_protocol(L"TCP Port " + boost::lexical_cast<std::wstring>(port), protocol),
+                                                       port);
                                        async_servers_.push_back(asyncbootstrapper);
 
                                        if (!primary_amcp_server_ && boost::iequals(protocol, L"AMCP"))
@@ -313,12 +315,12 @@ struct server::impl : boost::noncopyable
                }
        }
 
-       IO::protocol_strategy_factory<char>::ptr create_protocol(const std::wstring& name) const
+       IO::protocol_strategy_factory<char>::ptr create_protocol(const std::wstring& name, const std::wstring& port_description) const
        {
                using namespace IO;
 
                if(boost::iequals(name, L"AMCP"))
-                       return wrap_legacy_protocol("\r\n", spl::make_shared<amcp::AMCPProtocolStrategy>(spl::make_shared_ptr(amcp_command_repo_)));
+                       return wrap_legacy_protocol("\r\n", spl::make_shared<amcp::AMCPProtocolStrategy>(port_description, spl::make_shared_ptr(amcp_command_repo_)));
                else if(boost::iequals(name, L"CII"))
                        return wrap_legacy_protocol("\r\n", spl::make_shared<cii::CIIProtocolStrategy>(channels_, cg_registry_, producer_registry_));
                else if(boost::iequals(name, L"CLOCK"))