]> git.sesse.net Git - casparcg/commitdiff
Merged fix for asio::io_service lifetime race condition (sometimes destroyed too...
authorHelge Norberg <helge.norberg@svt.se>
Fri, 14 Aug 2015 11:14:11 +0000 (13:14 +0200)
committerHelge Norberg <helge.norberg@svt.se>
Fri, 14 Aug 2015 11:14:11 +0000 (13:14 +0200)
common/polling_filesystem_monitor.cpp
common/polling_filesystem_monitor.h
protocol/CMakeLists.txt
protocol/asio/io_service_manager.cpp [deleted file]
protocol/asio/io_service_manager.h [deleted file]
protocol/osc/client.cpp
protocol/osc/client.h
protocol/util/AsyncEventServer.cpp
protocol/util/AsyncEventServer.h
shell/server.cpp

index 02cb1d8785f19b25b19bf44f7a0533156797b7da..c5013fcd7d8d931024ece682f536658a2cd7050a 100644 (file)
@@ -28,6 +28,7 @@
 #include <iostream>
 #include <cstdint>
 
+#include <boost/asio.hpp>
 #include <boost/thread.hpp>
 #include <boost/filesystem/fstream.hpp>
 #include <boost/filesystem/convenience.hpp>
@@ -53,13 +54,7 @@ public:
        {
                try
                {
-                       boost::this_thread::interruption_point();
                        handler_(event, file);
-                       boost::this_thread::interruption_point();
-               }
-               catch (const boost::thread_interrupted&)
-               {
-                       throw;
                }
                catch (...)
                {
@@ -205,8 +200,10 @@ private:
 
 class polling_filesystem_monitor : public filesystem_monitor
 {
+       std::shared_ptr<boost::asio::io_service> scheduler_;
        directory_monitor root_monitor_;
-       boost::thread scanning_thread_;
+       executor executor_;
+       boost::asio::deadline_timer timer_;
        tbb::atomic<bool> running_;
        int scan_interval_millis_;
        std::promise<void> initial_scan_completion_;
@@ -218,61 +215,84 @@ public:
                        filesystem_event events_of_interest_mask,
                        bool report_already_existing,
                        int scan_interval_millis,
+                       std::shared_ptr<boost::asio::io_service> scheduler,
                        const filesystem_monitor_handler& handler,
                        const initial_files_handler& initial_files_handler)
-               : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+               : scheduler_(std::move(scheduler))
+               , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+               , executor_(L"polling_filesystem_monitor")
+               , timer_(*scheduler_)
                , scan_interval_millis_(scan_interval_millis)
        {
                running_ = true;
                reemmit_all_ = false;
-               scanning_thread_ = boost::thread([this] { scanner(); });
+               executor_.begin_invoke([this]
+               {
+                       scan();
+                       initial_scan_completion_.set_value();
+                       schedule_next();
+               });
        }
 
-       virtual ~polling_filesystem_monitor()
+       ~polling_filesystem_monitor()
        {
                running_ = false;
-               scanning_thread_.interrupt();
-               scanning_thread_.join();
+               boost::system::error_code e;
+               timer_.cancel(e);
        }
 
-       virtual std::future<void> initial_files_processed()
+       std::future<void> initial_files_processed() override
        {
                return initial_scan_completion_.get_future();
        }
 
-       virtual void reemmit_all()
+       void reemmit_all() override
        {
                reemmit_all_ = true;
        }
 
-       virtual void reemmit(const boost::filesystem::path& file)
+       void reemmit(const boost::filesystem::path& file) override
        {
                to_reemmit_.push(file);
        }
 private:
-       void scanner()
+       void schedule_next()
        {
-               ensure_gpf_handler_installed_for_thread("polling_filesystem_monitor");
+               if (!running_)
+                       return;
 
-               bool running = scan(false);
-               initial_scan_completion_.set_value();
+               timer_.expires_from_now(
+                       boost::posix_time::milliseconds(scan_interval_millis_));
+               timer_.async_wait([this](const boost::system::error_code& e)
+               {
+                       begin_scan();
+               });
+       }
+
+       void begin_scan()
+       {
+               if (!running_)
+                       return;
 
-               if (running)
-                       while (scan(true));
+               executor_.begin_invoke([this]()
+               {
+                       scan();
+                       schedule_next();
+               });
        }
 
-       bool scan(bool sleep)
+       void scan()
        {
+               if (!running_)
+                       return;
+
                try
                {
-                       if (sleep)
-                               boost::this_thread::sleep_for(boost::chrono::milliseconds(scan_interval_millis_));
-
                        if (reemmit_all_.fetch_and_store(false))
                                root_monitor_.reemmit_all();
                        else
                        {
-                               boost::filesystem::path file;
+                               boost::filesystem::wpath file;
 
                                while (to_reemmit_.try_pop(file))
                                        root_monitor_.reemmit(file);
@@ -280,30 +300,31 @@ private:
 
                        root_monitor_.scan([=] { return !running_; });
                }
-               catch (const boost::thread_interrupted&)
-               {
-               }
                catch (...)
                {
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
-
-               return running_;
        }
 };
 
 struct polling_filesystem_monitor_factory::impl
 {
+       std::shared_ptr<boost::asio::io_service> scheduler_;
        int scan_interval_millis;
 
-       impl(int scan_interval_millis)
-               : scan_interval_millis(scan_interval_millis)
+       impl(
+                       std::shared_ptr<boost::asio::io_service> scheduler,
+                       int scan_interval_millis)
+               : scheduler_(std::move(scheduler))
+               , scan_interval_millis(scan_interval_millis)
        {
        }
 };
 
-polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
-       : impl_(new impl(scan_interval_millis))
+polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
+               std::shared_ptr<boost::asio::io_service> scheduler,
+               int scan_interval_millis)
+       : impl_(new impl(std::move(scheduler), scan_interval_millis))
 {
 }
 
@@ -323,6 +344,7 @@ filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
                        events_of_interest_mask,
                        report_already_existing,
                        impl_->scan_interval_millis,
+                       impl_->scheduler_,
                        handler,
                        initial_files_handler);
 }
index 35eb1b9f5f755a030353e213157754ab6992d59d..7af006aed631c08911414ceb1973445cf4b6ffc6 100644 (file)
@@ -23,6 +23,8 @@
 
 #include "filesystem_monitor.h"
 
+#include <boost/asio.hpp>
+
 namespace caspar {
 
 /**
@@ -38,11 +40,15 @@ public:
        /**
         * Constructor.
         *
+        * @param scheduler            The io_service that will be used for
+        *                             scheduling periodic scans.
         * @param scan_interval_millis The number of milliseconds between each
         *                             scheduled scan. Lower values lowers the reaction
         *                             time but causes more I/O.
         */
-       polling_filesystem_monitor_factory(int scan_interval_millis = 5000);
+       polling_filesystem_monitor_factory(
+                       std::shared_ptr<boost::asio::io_service> scheduler,
+                       int scan_interval_millis = 5000);
        virtual ~polling_filesystem_monitor_factory();
        virtual filesystem_monitor::ptr create(
                        const boost::filesystem::path& folder_to_watch,
index e9a3b156e912813fba8227eb89b87c4e9300d1c0..9aa96d00d9c00dbc4ce52a9674ee8296abffd300 100644 (file)
@@ -7,8 +7,6 @@ set(SOURCES
                amcp/AMCPProtocolStrategy.cpp
                amcp/amcp_command_repository.cpp
 
-               asio/io_service_manager.cpp
-
                cii/CIICommandsImpl.cpp
                cii/CIIProtocolStrategy.cpp
 
@@ -37,8 +35,6 @@ set(HEADERS
                amcp/amcp_command_repository.h
                amcp/amcp_shared.h
 
-               asio/io_service_manager.h
-
                cii/CIICommand.h
                cii/CIICommandsImpl.h
                cii/CIIProtocolStrategy.h
@@ -77,7 +73,6 @@ include_directories(${RXCPP_INCLUDE_PATH})
 include_directories(${TBB_INCLUDE_PATH})
 
 source_group(sources\\amcp amcp/*)
-source_group(sources\\asio asio/*)
 source_group(sources\\cii cii/*)
 source_group(sources\\clk clk/*)
 source_group(sources\\osc\\oscpack osc/oscpack/*)
diff --git a/protocol/asio/io_service_manager.cpp b/protocol/asio/io_service_manager.cpp
deleted file mode 100644 (file)
index 44bfa37..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
-* Copyright 2013 Sveriges Television AB http://casparcg.com/
-*
-* This file is part of CasparCG (www.casparcg.com).
-*
-* CasparCG is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published by
-* the Free Software Foundation, either version 3 of the License, or
-* (at your option) any later version.
-*
-* CasparCG is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-* GNU General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
-*
-* Author: Helge Norberg, helge.norberg@svt.se
-*/
-
-#include "../StdAfx.h"
-
-#include "io_service_manager.h"
-
-#include <memory>
-
-#include <boost/asio/io_service.hpp>
-#include <boost/thread/thread.hpp>
-
-#include <common/os/general_protection_fault.h>
-
-namespace caspar { namespace protocol { namespace asio {
-
-struct io_service_manager::impl
-{
-       boost::asio::io_service service_;
-       // To keep the io_service::run() running although no pending async
-       // operations are posted.
-       std::unique_ptr<boost::asio::io_service::work> work_;
-       boost::thread thread_;
-
-       impl()
-               : work_(new boost::asio::io_service::work(service_))
-               , thread_([this] { run(); })
-       {
-       }
-
-       void run()
-       {
-               ensure_gpf_handler_installed_for_thread("asio-thread");
-
-               service_.run();
-       }
-
-       ~impl()
-       {
-               work_.reset();
-               service_.stop();
-               thread_.join();
-       }
-};
-
-io_service_manager::io_service_manager()
-       : impl_(new impl)
-{
-}
-
-io_service_manager::~io_service_manager()
-{
-}
-
-boost::asio::io_service& io_service_manager::service()
-{
-       return impl_->service_;
-}
-
-}}}
diff --git a/protocol/asio/io_service_manager.h b/protocol/asio/io_service_manager.h
deleted file mode 100644 (file)
index cc6f253..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-* Copyright 2013 Sveriges Television AB http://casparcg.com/
-*
-* This file is part of CasparCG (www.casparcg.com).
-*
-* CasparCG is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published by
-* the Free Software Foundation, either version 3 of the License, or
-* (at your option) any later version.
-*
-* CasparCG is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-* GNU General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
-*
-* Author: Helge Norberg, helge.norberg@svt.se
-*/
-
-#pragma once
-
-#include <memory>
-
-#include <boost/noncopyable.hpp>
-
-namespace boost { namespace asio {
-       class io_service;
-}}
-
-namespace caspar { namespace protocol { namespace asio {
-
-class io_service_manager : boost::noncopyable
-{
-public:
-       io_service_manager();
-       ~io_service_manager();
-       boost::asio::io_service& service();
-private:
-       struct impl;
-       std::unique_ptr<impl> impl_;
-};
-
-}}}
index 67fe61372ff29ee3e8007317d7513d9f0b5591dc..12b7a247d1d1bb126da37d730a09c918d9b2afdf 100644 (file)
@@ -127,7 +127,8 @@ void write_osc_bundle_element_start(byte_vector& destination, const byte_vector&
 
 struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
 {
-       udp::socket socket_;
+       std::shared_ptr<boost::asio::io_service>                service_;
+       udp::socket                                                                             socket_;
        tbb::spin_mutex                                                                 endpoints_mutex_;
        std::map<udp::endpoint, int>                                    reference_counts_by_endpoint_;
 
@@ -140,8 +141,9 @@ struct client::impl : public spl::enable_shared_from_this<client::impl>, core::m
        boost::thread                                                                   thread_;
        
 public:
-       impl(boost::asio::io_service& service)
-               : socket_(service, udp::v4())
+       impl(std::shared_ptr<boost::asio::io_service> service)
+               : service_(std::move(service))
+               , socket_(*service_, udp::v4())
                , thread_(boost::bind(&impl::run, this))
        {
        }
@@ -289,8 +291,8 @@ private:
        }
 };
 
-client::client(boost::asio::io_service& service) 
-       : impl_(new impl(service))
+client::client(std::shared_ptr<boost::asio::io_service> service)
+       : impl_(new impl(std::move(service)))
 {
 }
 
index 125fd80dec1632472d530eed195973861fddea0e..809ab4d1d1f8b4303b1e98afcff9380d180580b1 100644 (file)
@@ -40,7 +40,7 @@ public:
 
        // Constructors
 
-       client(boost::asio::io_service& service);
+       client(std::shared_ptr<boost::asio::io_service> service);
        
        client(client&&);
 
index 386826d83561926ce1ecf11031b709c60b9c2388..68d97530ae7787cddcd78ba35637d455d0cc84da 100644 (file)
@@ -31,7 +31,6 @@
 #include <functional>
 
 #include <boost/asio.hpp>
-#include <boost/thread.hpp>
 #include <boost/lexical_cast.hpp>
 
 #include <tbb/mutex.h>
@@ -290,18 +289,17 @@ private:
 
 struct AsyncEventServer::implementation
 {
-       boost::asio::io_service                                 service_;
-       tcp::acceptor                                                   acceptor_;
-       protocol_strategy_factory<char>::ptr    protocol_factory_;
-       spl::shared_ptr<connection_set>                 connection_set_;
-       boost::thread                                                   thread_;
-       std::vector<lifecycle_factory_t>                lifecycle_factories_;
+       std::shared_ptr<boost::asio::io_service>        service_;
+       tcp::acceptor                                                           acceptor_;
+       protocol_strategy_factory<char>::ptr            protocol_factory_;
+       spl::shared_ptr<connection_set>                         connection_set_;
+       std::vector<lifecycle_factory_t>                        lifecycle_factories_;
        tbb::mutex mutex_;
 
-       implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
-               : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
+       implementation(std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
+               : service_(std::move(service))
+               , acceptor_(*service_, tcp::endpoint(tcp::v4(), port))
                , protocol_factory_(protocol)
-               , thread_([&] { service_.run(); })
        {
                start_accept();
        }
@@ -317,19 +315,17 @@ struct AsyncEventServer::implementation
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
 
-               service_.post([=]
+               service_->post([=]
                {
                        auto connections = *connection_set_;
                        for (auto& connection : connections)
                                connection->stop();                             
                });
-
-               thread_.join();
        }
                
-    void start_accept() 
+       void start_accept() 
        {
-               spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
+               spl::shared_ptr<tcp::socket> socket(new tcp::socket(*service_));
                acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
     }
 
@@ -354,13 +350,13 @@ struct AsyncEventServer::implementation
 
        void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
        {
-               service_.post([=]{ lifecycle_factories_.push_back(factory); });
+               service_->post([=]{ lifecycle_factories_.push_back(factory); });
        }
 };
 
 AsyncEventServer::AsyncEventServer(
-               const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
-       : impl_(new implementation(protocol, port)) {}
+               std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
+       : impl_(new implementation(std::move(service), protocol, port)) {}
 
 AsyncEventServer::~AsyncEventServer() {}
 void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
index b01394dead72f70163e047d7777bee7e406cb4eb..f7551531b2e44030150aa2ca7da56ab6434440c5 100644 (file)
@@ -24,6 +24,8 @@
 
 #include "protocol_strategy.h"
 
+#include <boost/asio.hpp>
+
 namespace caspar { namespace IO {
 
        typedef std::function<std::pair<std::wstring, std::shared_ptr<void>> (const std::string& ipv4_address)>
@@ -32,7 +34,8 @@ namespace caspar { namespace IO {
 class AsyncEventServer
 {
 public:
-       explicit AsyncEventServer(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port);
+       explicit AsyncEventServer(
+                       std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port);
        ~AsyncEventServer();
 
        void add_client_lifecycle_object_factory(const lifecycle_factory_t& lifecycle_factory);
index e5a3be46bd4100c6bdf3e52de24e7ca6bc678ff4..381d71f8e25bbc0b8fc7a31526c5f6d25d7b95d6 100644 (file)
@@ -53,7 +53,6 @@
 
 #include <modules/image/consumer/image_consumer.h>
 
-#include <protocol/asio/io_service_manager.h>
 #include <protocol/amcp/AMCPProtocolStrategy.h>
 #include <protocol/amcp/amcp_command_repository.h>
 #include <protocol/amcp/AMCPCommandsImpl.h>
@@ -68,6 +67,7 @@
 #include <boost/lexical_cast.hpp>
 #include <boost/property_tree/ptree.hpp>
 #include <boost/property_tree/xml_parser.hpp>
+#include <boost/asio.hpp>
 
 #include <tbb/atomic.h>
 
@@ -78,9 +78,32 @@ namespace caspar {
 using namespace core;
 using namespace protocol;
 
+std::shared_ptr<boost::asio::io_service> create_running_io_service()
+{
+       auto service = std::make_shared<boost::asio::io_service>();
+       // To keep the io_service::run() running although no pending async
+       // operations are posted.
+       auto work = std::make_shared<boost::asio::io_service::work>(*service);
+       auto thread = std::make_shared<boost::thread>([service]
+       {
+               ensure_gpf_handler_installed_for_thread("asio-thread");
+
+               service->run();
+       });
+
+       return std::shared_ptr<boost::asio::io_service>(
+                       service.get(),
+                       [service, work, thread](void*) mutable
+                       {
+                               work.reset();
+                               service->stop();
+                               thread->join();
+                       });
+}
+
 struct server::impl : boost::noncopyable
 {
-       protocol::asio::io_service_manager                                      io_service_manager_;
+       std::shared_ptr<boost::asio::io_service>                        io_service_                                             = create_running_io_service();
        spl::shared_ptr<monitor::subject>                                       monitor_subject_;
        spl::shared_ptr<monitor::subject>                                       diag_subject_                                   = core::diagnostics::get_or_create_subject();
        accelerator::accelerator                                                        accelerator_;
@@ -103,7 +126,7 @@ struct server::impl : boost::noncopyable
 
        explicit impl(std::promise<bool>& shutdown_server_now)          
                : accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
-               , osc_client_(io_service_manager_.service())
+               , osc_client_(io_service_)
                , media_info_repo_(create_in_memory_media_info_repository())
                , producer_registry_(spl::make_shared<core::frame_producer_registry>(help_repo_))
                , consumer_registry_(spl::make_shared<core::frame_consumer_registry>(help_repo_))
@@ -254,7 +277,7 @@ struct server::impl : boost::noncopyable
 
                auto scan_interval_millis = pt.get(L"configuration.thumbnails.scan-interval-millis", 5000);
 
-               polling_filesystem_monitor_factory monitor_factory(scan_interval_millis);
+               polling_filesystem_monitor_factory monitor_factory(io_service_, scan_interval_millis);
                thumbnail_generator_.reset(new thumbnail_generator(
                        monitor_factory, 
                        env::media_folder(),
@@ -298,6 +321,7 @@ struct server::impl : boost::noncopyable
                                {                                       
                                        unsigned int port = xml_controller.second.get(L"port", 5250);
                                        auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(
+                                                       io_service_,
                                                        create_protocol(protocol, L"TCP Port " + boost::lexical_cast<std::wstring>(port)),
                                                        port);
                                        async_servers_.push_back(asyncbootstrapper);
@@ -335,6 +359,8 @@ struct server::impl : boost::noncopyable
        {
                initial_media_info_thread_ = boost::thread([this]
                {
+                       ensure_gpf_handler_installed_for_thread("initial media scan");
+
                        for (boost::filesystem::wrecursive_directory_iterator iter(env::media_folder()), end; iter != end; ++iter)
                        {
                                if (running_)