]> git.sesse.net Git - casparcg/commitdiff
- Removed need of non-deterministic sleeps during server shutdown.
authorHelge Norberg <helge.norberg@svt.se>
Tue, 18 Aug 2015 20:14:01 +0000 (22:14 +0200)
committerHelge Norberg <helge.norberg@svt.se>
Tue, 18 Aug 2015 20:14:01 +0000 (22:14 +0200)
- Made KILL and RESTART shutdown as cleanly as "q".
- Added logging to some destructors to help when debugging shutdown issues.
- Fixed some deadlocks/lifetime issues.

13 files changed:
common/executor.h
common/polling_filesystem_monitor.cpp
core/consumer/frame_consumer.cpp
core/consumer/frame_consumer.h
core/producer/frame_producer.cpp
core/producer/frame_producer.h
core/video_channel.cpp
modules/html/html.cpp
modules/screen/consumer/screen_consumer.cpp
protocol/osc/client.cpp
protocol/util/AsyncEventServer.cpp
shell/main.cpp
shell/server.cpp

index 59e12ba905e446e3caab34a4d84a771644ef1121..1e69ff1c07658d9a31819bf67a9d5afb52f6a0f2 100644 (file)
@@ -79,12 +79,15 @@ public:
        
        ~executor()
        {
+               CASPAR_LOG(trace) << L"Shutting down " << name_;
+
                try
                {
-                       internal_begin_invoke([=]
-                       {
-                               is_running_ = false;
-                       }).wait();
+                       if (is_running_)
+                               internal_begin_invoke([=]
+                               {
+                                       is_running_ = false;
+                               }).wait();
                }
                catch(...)
                {
index c5013fcd7d8d931024ece682f536658a2cd7050a..52a411828a2ade1b411361cf3d04239d6298eadf 100644 (file)
@@ -200,15 +200,15 @@ private:
 
 class polling_filesystem_monitor : public filesystem_monitor
 {
+       tbb::atomic<bool> running_;
        std::shared_ptr<boost::asio::io_service> scheduler_;
        directory_monitor root_monitor_;
-       executor executor_;
        boost::asio::deadline_timer timer_;
-       tbb::atomic<bool> running_;
        int scan_interval_millis_;
        std::promise<void> initial_scan_completion_;
        tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
        tbb::atomic<bool> reemmit_all_;
+       executor executor_;
 public:
        polling_filesystem_monitor(
                        const boost::filesystem::path& folder_to_watch,
@@ -220,9 +220,9 @@ public:
                        const initial_files_handler& initial_files_handler)
                : scheduler_(std::move(scheduler))
                , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
-               , executor_(L"polling_filesystem_monitor")
                , timer_(*scheduler_)
                , scan_interval_millis_(scan_interval_millis)
+               , executor_(L"polling_filesystem_monitor")
        {
                running_ = true;
                reemmit_all_ = false;
index 6c0f6ba7b149cee599c1db62197ca24d8ab6b44e..320ff13f74a7677e96b725f695178fde8bafad17 100644 (file)
@@ -25,6 +25,7 @@
 
 #include <common/except.h>
 #include <common/future.h>
+#include <common/os/general_protection_fault.h>
 
 #include <core/video_format.h>
 #include <core/frame/frame.h>
@@ -67,6 +68,18 @@ void frame_consumer_registry::register_preconfigured_consumer_factory(
        impl_->preconfigured_consumer_factories.insert(std::make_pair(element_name, factory));
 }
 
+tbb::atomic<bool>& destroy_consumers_in_separate_thread()
+{
+       static tbb::atomic<bool> state;
+
+       return state;
+}
+
+void destroy_consumers_synchronously()
+{
+       destroy_consumers_in_separate_thread() = false;
+}
+
 class destroy_consumer_proxy : public frame_consumer
 {      
        std::shared_ptr<frame_consumer> consumer_;
@@ -74,6 +87,7 @@ public:
        destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) 
                : consumer_(std::move(consumer))
        {
+               destroy_consumers_in_separate_thread() = true;
        }
 
        ~destroy_consumer_proxy()
@@ -81,6 +95,9 @@ public:
                static tbb::atomic<int> counter;
                static std::once_flag counter_init_once;
                std::call_once(counter_init_once, []{ counter = 0; });
+
+               if (!destroy_consumers_in_separate_thread())
+                       return;
                        
                ++counter;
                CASPAR_VERIFY(counter < 8);
@@ -89,11 +106,13 @@ public:
                boost::thread([=]
                {
                        std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);
-
                        auto str = (*consumer)->print();
+
                        try
                        {
-                               if(!consumer->unique())
+                               ensure_gpf_handler_installed_for_thread(u8(L"Destroyer: " + str).c_str());
+
+                               if (!consumer->unique())
                                        CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();
                                else
                                        CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";
index 5390a79c2326b7b8029c40c71a20d5eefa536db7..9b3937b59b6ebd886141b5be0db8cc21916284e4 100644 (file)
@@ -99,4 +99,6 @@ private:
        spl::shared_ptr<impl> impl_;
 };
 
+void destroy_consumers_synchronously();
+
 }}
index f5075b782aaef1a19fbc1e5094b5feb27924e755..dacf09203e4ecfab346fb92071c1481b843eeb55 100644 (file)
@@ -215,7 +215,19 @@ const spl::shared_ptr<frame_producer>& frame_producer::empty()
 
        static spl::shared_ptr<frame_producer> producer = spl::make_shared<empty_frame_producer>();
        return producer;
-}      
+}
+
+tbb::atomic<bool>& destroy_producers_in_separate_thread()
+{
+       static tbb::atomic<bool> state;
+
+       return state;
+}
+
+void destroy_producers_synchronously()
+{
+       destroy_producers_in_separate_thread() = false;
+}
 
 class destroy_producer_proxy : public frame_producer
 {      
@@ -224,15 +236,16 @@ public:
        destroy_producer_proxy(spl::shared_ptr<frame_producer>&& producer) 
                : producer_(std::move(producer))
        {
+               destroy_producers_in_separate_thread() = true;
        }
 
        virtual ~destroy_producer_proxy()
-       {               
+       {
                static tbb::atomic<int> counter;
                static std::once_flag counter_init_once;
                std::call_once(counter_init_once, []{ counter = 0; });
                
-               if(producer_ == core::frame_producer::empty())
+               if(producer_ == core::frame_producer::empty() || !destroy_producers_in_separate_thread())
                        return;
 
                ++counter;
@@ -245,7 +258,9 @@ public:
                        auto str = (*producer)->print();
                        try
                        {
-                               if(!producer->unique())
+                               ensure_gpf_handler_installed_for_thread(u8(L"Destroyer: " + str).c_str());
+
+                               if (!producer->unique())
                                        CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << producer->use_count();
                                else
                                        CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";
index 0a91db2a9b314994d214f4e59c7559d0f2c0e42c..fff7186583380e72fbe6fef0088ddf10ab48276e 100644 (file)
@@ -165,5 +165,6 @@ private:
 };
 
 spl::shared_ptr<core::frame_producer> create_destroy_proxy(spl::shared_ptr<core::frame_producer> producer);
+void destroy_producers_synchronously();
 
 }}
index 1eb8d92e07312ae1d4b4a06c43f68b40035a5c59..777f8e35e1fea95cb251da28cdf09c72c47b32db 100644 (file)
@@ -94,6 +94,11 @@ public:
 
                CASPAR_LOG(info) << print() << " Successfully Initialized.";
        }
+
+       ~impl()
+       {
+               CASPAR_LOG(info) << print() << " Uninitializing.";
+       }
                                                        
        core::video_format_desc video_format_desc() const
        {
@@ -144,7 +149,8 @@ public:
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
 
-               executor_.begin_invoke([=]{tick();});
+               if (executor_.is_running())
+                       executor_.begin_invoke([=]{tick();});
        }
                        
        std::wstring print() const
index ad45dda213ee121ba6b33baf359d8bed2ebd1fa9..eb9d625fbe654917412c06636fee0ddec16deec7 100644 (file)
@@ -330,6 +330,7 @@ void uninit()
        {
                CefShutdown();
        });
+       g_cef_executor.reset();
 }
 
 class cef_task : public CefTask
index adc3d5729e2a8e5f1adc0cd471156563ac20e55f..4d6887164b9d21e0a0a3818e55f4e4d33e2ebb84 100644 (file)
@@ -356,7 +356,7 @@ public:
                                                }
                                        }
                        
-                                       auto frame = core::const_frame::empty();
+                                       core::const_frame frame;
                                        frame_buffer_.pop(frame);
 
                                        render_and_draw_frame(frame);
index 47b55e1c452736aaa412923a61dd0cf33e48df0c..1324bb6735082e74dfd99a7caaa141600ec0d62c 100644 (file)
@@ -238,6 +238,9 @@ private:
                                {                       
                                        boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
 
+                                       if (!is_running_)
+                                               return;
+
                                        if (updates_.empty())
                                                updates_cond_.wait(cond_lock);
 
index 395b156c8e4de7cfcec9ca7595252790c8982836..8b3435429fed0d3c605a4cf8f0fcaa19a6e75a6b 100644 (file)
@@ -51,7 +51,7 @@ class connection : public spl::enable_shared_from_this<connection>
        typedef tbb::concurrent_queue<std::string>      send_queue;
 
     const spl::shared_ptr<tcp::socket>                         socket_; 
-       boost::asio::io_service&                                                service_;
+       std::shared_ptr<boost::asio::io_service>                service_;
        const spl::shared_ptr<connection_set>                   connection_set_;
        const std::wstring                                                              name_;
        protocol_strategy_factory<char>::ptr                    protocol_factory_;
@@ -125,9 +125,9 @@ class connection : public spl::enable_shared_from_this<connection>
        };
 
 public:
-    static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
+       static spl::shared_ptr<connection> create(std::shared_ptr<boost::asio::io_service> service, spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
        {
-               spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
+               spl::shared_ptr<connection> con(new connection(std::move(service), std::move(socket), std::move(protocol), std::move(connection_set)));
                con->read_some();
                return con;
     }
@@ -150,12 +150,12 @@ public:
        virtual void send(std::string&& data)
        {
                send_queue_.push(std::move(data));
-               service_.dispatch([=] { do_write(); });
+               service_->dispatch([=] { do_write(); });
        }
 
        virtual void disconnect()
        {
-               service_.dispatch([=] { stop(); });
+               service_->dispatch([=] { stop(); });
        }
 
        void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
@@ -210,9 +210,9 @@ private:
                return socket_->is_open() ? socket_->remote_endpoint().address().to_string() : "no-address";
        }
 
-    connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
+    connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set) 
                : socket_(socket)
-               , service_(socket->get_io_service())
+               , service_(service)
                , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
                , connection_set_(connection_set)
                , protocol_factory_(protocol_factory)
@@ -294,7 +294,7 @@ struct AsyncEventServer::implementation
        protocol_strategy_factory<char>::ptr            protocol_factory_;
        spl::shared_ptr<connection_set>                         connection_set_;
        std::vector<lifecycle_factory_t>                        lifecycle_factories_;
-       tbb::mutex mutex_;
+       tbb::mutex                                                                      mutex_;
 
        implementation(std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
                : service_(std::move(service))
@@ -308,7 +308,8 @@ struct AsyncEventServer::implementation
        {
                try
                {
-                       acceptor_.close();                      
+                       acceptor_.cancel();
+                       acceptor_.close();
                }
                catch(...)
                {
@@ -337,7 +338,7 @@ struct AsyncEventServer::implementation
                
         if (!error)
                {
-                       auto conn = connection::create(socket, protocol_factory_, connection_set_);
+                       auto conn = connection::create(service_, socket, protocol_factory_, connection_set_);
                        connection_set_->insert(conn);
 
                        for (auto& lifecycle_factory : lifecycle_factories_)
index 791cbce00e707132a9ac3d5012406278072b8e99..d806e90a97007b57c8e23dd4ce64d346de5653ec 100644 (file)
@@ -216,34 +216,40 @@ bool run(const std::wstring& config_file_name)
        print_info();
 
        // Create server object which initializes channels, protocols and controllers.
-       server caspar_server(shutdown_server_now);
+       std::unique_ptr<server> caspar_server(new server(shutdown_server_now));
        
        // Print environment information.
-       print_system_info(caspar_server.get_system_info_provider_repo());
+       print_system_info(caspar_server->get_system_info_provider_repo());
 
        std::wstringstream str;
        boost::property_tree::xml_writer_settings<std::wstring> w(' ', 3);
        boost::property_tree::write_xml(str, env::properties(), w);
        CASPAR_LOG(info) << config_file_name << L":\n-----------------------------------------\n" << str.str() << L"-----------------------------------------";
        
-       caspar_server.start();
+       caspar_server->start();
 
        // Create a dummy client which prints amcp responses to console.
        auto console_client = spl::make_shared<IO::ConsoleClientInfo>();
 
        // Create a amcp parser for console commands.
-       auto amcp = spl::make_shared<caspar::IO::delimiter_based_chunking_strategy_factory<wchar_t>>(
+       std::shared_ptr<IO::protocol_strategy<wchar_t>> amcp = spl::make_shared<caspar::IO::delimiter_based_chunking_strategy_factory<wchar_t>>(
                        L"\r\n",
                        spl::make_shared<caspar::IO::legacy_strategy_adapter_factory>(
                                        spl::make_shared<protocol::amcp::AMCPProtocolStrategy>(
                                                        L"Console",
-                                                       caspar_server.get_amcp_command_repository())))->create(console_client);
+                                                       caspar_server->get_amcp_command_repository())))->create(console_client);
+       std::weak_ptr<IO::protocol_strategy<wchar_t>> weak_amcp = amcp;
 
        // Use separate thread for the blocking console input, will be terminated 
        // anyway when the main thread terminates.
-       boost::thread stdin_thread(std::bind(do_run, amcp, std::ref(shutdown_server_now)));     //compiler didn't like lambda here...
+       boost::thread stdin_thread(std::bind(do_run, weak_amcp, std::ref(shutdown_server_now)));        //compiler didn't like lambda here...
        stdin_thread.detach();
-       return shutdown_server.get();
+       bool should_restart = shutdown_server.get();
+       amcp.reset();
+
+       while (weak_amcp.lock());
+
+       return should_restart;
 }
 
 void on_abort(int)
@@ -277,7 +283,7 @@ int main(int argc, char** argv)
        struct tbb_thread_installer : public tbb::task_scheduler_observer
        {
                tbb_thread_installer(){observe(true);}
-               void on_scheduler_entry(bool is_worker)
+               void on_scheduler_entry(bool is_worker) override
                {
                        ensure_gpf_handler_installed_for_thread("tbb-worker-thread");
                }
@@ -307,9 +313,13 @@ int main(int argc, char** argv)
                setup_console_window();
 
                return_code = run(config_file_name) ? 5 : 0;
-               
-               boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
 
+               for (auto& thread : get_thread_infos())
+               {
+                       if (thread->name != "main thread" && thread->name != "tbb-worker-thread")
+                               CASPAR_LOG(warning) << L"Thread left running: " << thread->name << L" (" << thread->native_id << L")";
+               }
+               
                CASPAR_LOG(info) << "Successfully shutdown CasparCG Server.";
        }
        catch(boost::property_tree::file_parser_error&)
index b396063f3d08a774fc09a546047d8dec6e6df725..2f7609c05353f4eaaef2f99128513910442c21ad 100644 (file)
@@ -108,7 +108,10 @@ std::shared_ptr<boost::asio::io_service> create_running_io_service()
                        {
                                work.reset();
                                service->stop();
-                               thread->join();
+                               if (thread->get_id() != boost::this_thread::get_id())
+                                       thread->join();
+                               else
+                                       thread->detach();
                        });
 }
 
@@ -122,7 +125,7 @@ struct server::impl : boost::noncopyable
        std::shared_ptr<amcp::amcp_command_repository>          amcp_command_repo_;
        std::vector<spl::shared_ptr<IO::AsyncEventServer>>      async_servers_;
        std::shared_ptr<IO::AsyncEventServer>                           primary_amcp_server_;
-       osc::client                                                                                     osc_client_;
+       std::shared_ptr<osc::client>                                            osc_client_                                             = std::make_shared<osc::client>(io_service_);
        std::vector<std::shared_ptr<void>>                                      predefined_osc_subscriptions_;
        std::vector<spl::shared_ptr<video_channel>>                     channels_;
        spl::shared_ptr<media_info_repository>                          media_info_repo_;
@@ -137,7 +140,6 @@ struct server::impl : boost::noncopyable
 
        explicit impl(std::promise<bool>& shutdown_server_now)          
                : accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
-               , osc_client_(io_service_)
                , media_info_repo_(create_in_memory_media_info_repository())
                , producer_registry_(spl::make_shared<core::frame_producer_registry>(help_repo_))
                , consumer_registry_(spl::make_shared<core::frame_consumer_registry>(help_repo_))
@@ -187,13 +189,19 @@ struct server::impl : boost::noncopyable
                        initial_media_info_thread_.join();
                }
 
+               std::weak_ptr<boost::asio::io_service> weak_io_service = io_service_;
+               io_service_.reset();
+               osc_client_.reset();
                thumbnail_generator_.reset();
+               amcp_command_repo_.reset();
                primary_amcp_server_.reset();
                async_servers_.clear();
+               destroy_producers_synchronously();
+               destroy_consumers_synchronously();
                channels_.clear();
-
-               boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
-               //Sleep(500); // HACK: Wait for asynchronous destruction of producers and consumers.
+               
+               while (weak_io_service.lock())
+                       boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
 
                uninitialize_modules();
                core::diagnostics::osd::shutdown();
@@ -242,7 +250,7 @@ struct server::impl : boost::noncopyable
                using boost::property_tree::wptree;
                using namespace boost::asio::ip;
 
-               monitor_subject_->attach_parent(osc_client_.sink());
+               monitor_subject_->attach_parent(osc_client_->sink());
                
                auto default_port =
                                pt.get<unsigned short>(L"configuration.osc.default-port", 6250);
@@ -258,7 +266,7 @@ struct server::impl : boost::noncopyable
                                const auto port =
                                                predefined_client.second.get<unsigned short>(L"port");
                                predefined_osc_subscriptions_.push_back(
-                                               osc_client_.get_subscription_token(udp::endpoint(
+                                               osc_client_->get_subscription_token(udp::endpoint(
                                                                address_v4::from_string(u8(address)),
                                                                port)));
                        }
@@ -273,7 +281,7 @@ struct server::impl : boost::noncopyable
 
                                                return std::make_pair(
                                                                std::wstring(L"osc_subscribe"),
-                                                               osc_client_.get_subscription_token(
+                                                               osc_client_->get_subscription_token(
                                                                                udp::endpoint(
                                                                                                address_v4::from_string(
                                                                                                                ipv4_address),