]> git.sesse.net Git - casparcg/commitdiff
Implemented INFO QUEUES command for debugging AMCP command queues. Useful for debuggi...
authorHelge Norberg <helge.norberg@svt.se>
Wed, 5 Nov 2014 12:49:48 +0000 (13:49 +0100)
committerHelge Norberg <helge.norberg@svt.se>
Wed, 5 Nov 2014 12:49:48 +0000 (13:49 +0100)
common/concurrency/executor.h
protocol/amcp/AMCPCommand.h
protocol/amcp/AMCPCommandQueue.cpp
protocol/amcp/AMCPCommandQueue.h
protocol/amcp/AMCPCommandsImpl.cpp
protocol/amcp/AMCPProtocolStrategy.cpp
protocol/amcp/AMCPProtocolStrategy.h
shell/main.cpp
shell/server.cpp

index 27fd5caa70606e382eda80641e6e4e5526bbb5d1..51fb90850977f854f2039cfac86e249dcdfc846c 100644 (file)
@@ -226,6 +226,7 @@ public:
        function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size();  }
        bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty();     }
        bool is_running() const /*noexcept*/ { return is_running_; }    
+       const std::string& name() const { return name_; }
                
 private:
        
index 2f12754b48b99cdef53308b4f3b92b90485a033f..dde4d8ca9b193f3a04e3263b988aa59907444069 100644 (file)
@@ -60,6 +60,8 @@ namespace caspar { namespace protocol { namespace amcp {
                        _parameters = p;\r
                }\r
 \r
+               const core::parameters& GetParameters() const { return _parameters; }\r
+\r
                void SetClientInfo(IO::ClientInfoPtr& s){pClientInfo_ = s;}\r
                IO::ClientInfoPtr GetClientInfo(){return pClientInfo_;}\r
 \r
index c2e34581172f639ff730087f244d0a5a5a1781a8..c81ed6baf469957993ccf6c4a0e85b181f1c8d19 100644 (file)
 \r
 #include "AMCPCommandQueue.h"\r
 \r
+#include <boost/property_tree/ptree.hpp>\r
+\r
+#include <common/concurrency/lock.h>\r
+\r
 namespace caspar { namespace protocol { namespace amcp {\r
+\r
+namespace {\r
+\r
+tbb::spin_mutex& get_global_mutex()\r
+{\r
+       static tbb::spin_mutex mutex;\r
+\r
+       return mutex;\r
+}\r
+\r
+std::map<std::wstring, AMCPCommandQueue*>& get_instances()\r
+{\r
+       static std::map<std::wstring, AMCPCommandQueue*> queues;\r
+\r
+       return queues;\r
+}\r
+\r
+}\r
        \r
-AMCPCommandQueue::AMCPCommandQueue() \r
-       : executor_(L"AMCPCommandQueue")\r
+AMCPCommandQueue::AMCPCommandQueue(const std::wstring& name)\r
+       : executor_(L"AMCPCommandQueue " + name)\r
+       , running_command_(false)\r
 {\r
+       tbb::spin_mutex::scoped_lock lock(get_global_mutex());\r
+\r
+       get_instances().insert(std::make_pair(name, this));\r
 }\r
 \r
 AMCPCommandQueue::~AMCPCommandQueue() \r
 {\r
+       tbb::spin_mutex::scoped_lock lock(get_global_mutex());\r
+\r
+       get_instances().erase(widen(executor_.name()));\r
 }\r
 \r
 void AMCPCommandQueue::AddCommand(AMCPCommandPtr pCurrentCommand)\r
@@ -60,21 +89,35 @@ void AMCPCommandQueue::AddCommand(AMCPCommandPtr pCurrentCommand)
                {\r
                        try\r
                        {\r
+                               auto print = pCurrentCommand->print();\r
+                               auto params = pCurrentCommand->GetParameters().get_original_string();\r
+\r
+                               {\r
+                                       tbb::spin_mutex::scoped_lock lock(running_command_mutex_);\r
+                                       running_command_ = true;\r
+                                       running_command_name_ = print;\r
+                                       running_command_params_ = std::move(params);\r
+                                       running_command_since_.restart();\r
+                               }\r
+\r
                                if(pCurrentCommand->Execute()) \r
-                                       CASPAR_LOG(debug) << "Executed command: " << pCurrentCommand->print();\r
+                                       CASPAR_LOG(debug) << "Executed command: " << print << L" on " << widen(executor_.name());\r
                                else \r
-                                       CASPAR_LOG(warning) << "Failed to execute command: " << pCurrentCommand->print();\r
+                                       CASPAR_LOG(warning) << "Failed to execute command: " << print << L" on " << widen(executor_.name());\r
                        }\r
                        catch(...)\r
                        {\r
                                CASPAR_LOG_CURRENT_EXCEPTION();\r
-                               CASPAR_LOG(error) << "Failed to execute command:" << pCurrentCommand->print();\r
+                               CASPAR_LOG(error) << "Failed to execute command:" << pCurrentCommand->print() << L" on " << widen(executor_.name());\r
                                pCurrentCommand->SetReplyString(L"500 FAILED\r\n");\r
                        }\r
                                \r
                        pCurrentCommand->SendReply();\r
                        \r
                        CASPAR_LOG(trace) << "Ready for a new command";\r
+\r
+                       tbb::spin_mutex::scoped_lock lock(running_command_mutex_);\r
+                       running_command_ = false;\r
                }\r
                catch(...)\r
                {\r
@@ -83,4 +126,54 @@ void AMCPCommandQueue::AddCommand(AMCPCommandPtr pCurrentCommand)
        });\r
 }\r
 \r
+boost::property_tree::wptree AMCPCommandQueue::info() const\r
+{\r
+       boost::property_tree::wptree info;\r
+\r
+       auto name = widen(executor_.name());\r
+       info.add(L"name", name);\r
+       auto size = executor_.size();\r
+       info.add(L"queued", std::max(0, size));\r
+\r
+       bool running_command;\r
+       std::wstring running_command_name;\r
+       std::wstring running_command_params;\r
+       int64_t running_command_elapsed;\r
+\r
+       lock(running_command_mutex_, [&]\r
+       {\r
+               running_command = running_command_;\r
+               \r
+               if (running_command)\r
+               {\r
+                       running_command_name = running_command_name_;\r
+                       running_command_params = running_command_params_;\r
+                       running_command_elapsed = static_cast<int64_t>(\r
+                                       running_command_since_.elapsed() * 1000.0);\r
+               }\r
+       });\r
+\r
+       if (running_command)\r
+       {\r
+               info.add(L"running.command", running_command_name);\r
+               info.add(L"running.params", running_command_params);\r
+               info.add(L"running.elapsed", running_command_elapsed);\r
+       }\r
+\r
+       return info;\r
+}\r
+\r
+boost::property_tree::wptree AMCPCommandQueue::info_all_queues()\r
+{\r
+       boost::property_tree::wptree info;\r
+       tbb::spin_mutex::scoped_lock lock(get_global_mutex());\r
+\r
+       BOOST_FOREACH(auto& queue, get_instances())\r
+       {\r
+               info.add_child(L"queues.queue", queue.second->info());\r
+       }\r
+\r
+       return info;\r
+}\r
+\r
 }}}
\ No newline at end of file
index 9bccd8b3328cff04004c2ecc90f6ea5009fe1666..fa605af02dc684a3aef07c83367194d2e4c1a2bf 100644 (file)
 \r
 #include <common/concurrency/executor.h>\r
 \r
-#include <tbb\mutex.h>\r
+#include <boost/property_tree/ptree_fwd.hpp>\r
+#include <boost/timer.hpp>\r
+\r
+#include <tbb/spin_mutex.h>\r
 \r
 namespace caspar { namespace protocol { namespace amcp {\r
 \r
@@ -36,13 +39,21 @@ class AMCPCommandQueue
        AMCPCommandQueue(const AMCPCommandQueue&);\r
        AMCPCommandQueue& operator=(const AMCPCommandQueue&);\r
 public:\r
-       AMCPCommandQueue();\r
+       AMCPCommandQueue(const std::wstring& name);\r
        ~AMCPCommandQueue();\r
 \r
        void AddCommand(AMCPCommandPtr pCommand);\r
 \r
+       boost::property_tree::wptree info() const;\r
+\r
+       static boost::property_tree::wptree info_all_queues();\r
 private:\r
-       executor                        executor_;\r
+       executor                                executor_;\r
+       mutable tbb::spin_mutex running_command_mutex_;\r
+       bool                                    running_command_;\r
+       std::wstring                    running_command_name_;\r
+       std::wstring                    running_command_params_;\r
+       boost::timer                    running_command_since_;\r
 };\r
 typedef std::tr1::shared_ptr<AMCPCommandQueue> AMCPCommandQueuePtr;\r
 \r
index 6d3bfac9ddaa8eb7313c1981c1365883a31f939c..563accdfc4b91c2b8ee34910d43b3a6ee3b943a1 100644 (file)
@@ -2179,6 +2179,13 @@ bool InfoCommand::DoExecute()
 \r
                        boost::property_tree::write_xml(replyString, info, w);\r
                }\r
+               else if(_parameters.size() >= 1 && _parameters[0] == L"QUEUES")\r
+               {\r
+                       replyString << L"201 INFO QUEUES OK\r\n";\r
+\r
+                       boost::property_tree::wptree info = AMCPCommandQueue::info_all_queues();\r
+                       boost::property_tree::write_xml(replyString, info, w);\r
+               }\r
                else if(_parameters.size() >= 1 && _parameters[0] == L"SYSTEM")\r
                {\r
                        replyString << L"201 INFO SYSTEM OK\r\n";\r
index a711021ad03dbb96cee700f2bee001e080981de9..64fb28a983d67433879e43bcedc78ada8e655050 100644 (file)
@@ -54,6 +54,7 @@ inline std::shared_ptr<core::video_channel> GetChannelSafe(unsigned int index, c
 }\r
 \r
 AMCPProtocolStrategy::AMCPProtocolStrategy(\r
+               const std::wstring& name,\r
                const std::vector<safe_ptr<core::video_channel>>& channels,\r
                const std::shared_ptr<core::thumbnail_generator>& thumb_gen,\r
                const safe_ptr<core::media_info_repository>& media_info_repo,\r
@@ -65,7 +66,7 @@ AMCPProtocolStrategy::AMCPProtocolStrategy(
        , ogl_(ogl_device)\r
        , shutdown_server_now_(shutdown_server_now)\r
 {\r
-       AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue());\r
+       AMCPCommandQueuePtr pGeneralCommandQueue(new AMCPCommandQueue(L"General Queue for " + name));\r
        commandQueues_.push_back(pGeneralCommandQueue);\r
 \r
 \r
@@ -73,7 +74,7 @@ AMCPProtocolStrategy::AMCPProtocolStrategy(
        unsigned int index = -1;\r
        //Create a commandpump for each video_channel\r
        while((pChannel = GetChannelSafe(++index, channels_)) != 0) {\r
-               AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue());\r
+               AMCPCommandQueuePtr pChannelCommandQueue(new AMCPCommandQueue(L"Channel " + boost::lexical_cast<std::wstring>(index + 1) + L" for " + name));\r
                std::wstring title = TEXT("video_channel ");\r
 \r
                //HACK: Perform real conversion from int to string\r
index f885ff09d2a8dc0e2dadd55598937e22027d2a29..1c72b1a0b0fe1fc52b1aef28624a64d11d184828 100644 (file)
@@ -50,6 +50,7 @@ class AMCPProtocolStrategy : public IO::IProtocolStrategy, boost::noncopyable
 \r
 public:\r
        AMCPProtocolStrategy(\r
+                       const std::wstring& name,\r
                        const std::vector<safe_ptr<core::video_channel>>& channels,\r
                        const std::shared_ptr<core::thumbnail_generator>& thumb_gen,\r
                        const safe_ptr<core::media_info_repository>& media_info_repo,\r
index 15f843bf30a9e7a340e14b8ee1ae2608f2275680..1f5193e24e623af67c0e7e4247b15d63feb38ab6 100644 (file)
@@ -315,6 +315,7 @@ int main(int argc, char* argv[])
 \r
                                // Create a amcp parser for console commands.\r
                                caspar::protocol::amcp::AMCPProtocolStrategy amcp(\r
+                                               L"Console",\r
                                                caspar_server.get_channels(),\r
                                                caspar_server.get_thumbnail_generator(),\r
                                                caspar_server.get_media_info_repo(),\r
index 05318aa9fc2ae50d8736fb19a9541ba1f44beffe..602f7d83edf44b8e435c8ae46898b65e7dc1671c 100644 (file)
@@ -293,7 +293,10 @@ struct server::implementation : boost::noncopyable
                                if(name == L"tcp")\r
                                {                                       \r
                                        unsigned int port = xml_controller.second.get(L"port", 5250);\r
-                                       auto asyncbootstrapper = make_safe<IO::AsyncEventServer>(create_protocol(protocol), port);\r
+                                       auto asyncbootstrapper = make_safe<IO::AsyncEventServer>(create_protocol(\r
+                                                       protocol,\r
+                                                       L"TCP Port " + boost::lexical_cast<std::wstring>(port)),\r
+                                                       port);\r
                                        asyncbootstrapper->Start();\r
                                        async_servers_.push_back(asyncbootstrapper);\r
 \r
@@ -376,10 +379,16 @@ struct server::implementation : boost::noncopyable
                CASPAR_LOG(info) << L"Initialized thumbnail generator.";\r
        }\r
 \r
-       safe_ptr<IO::IProtocolStrategy> create_protocol(const std::wstring& name) const\r
+       safe_ptr<IO::IProtocolStrategy> create_protocol(const std::wstring& name, const std::wstring& port_description) const\r
        {\r
                if(boost::iequals(name, L"AMCP"))\r
-                       return make_safe<amcp::AMCPProtocolStrategy>(channels_, thumbnail_generator_, media_info_repo_, ogl_, shutdown_server_now_);\r
+                       return make_safe<amcp::AMCPProtocolStrategy>(\r
+                                       port_description,\r
+                                       channels_,\r
+                                       thumbnail_generator_,\r
+                                       media_info_repo_,\r
+                                       ogl_,\r
+                                       shutdown_server_now_);\r
                else if(boost::iequals(name, L"CII"))\r
                        return make_safe<cii::CIIProtocolStrategy>(channels_);\r
                else if(boost::iequals(name, L"CLOCK"))\r