]> git.sesse.net Git - casparcg/blobdiff - common/polling_filesystem_monitor.cpp
Fix a few Clang warnings.
[casparcg] / common / polling_filesystem_monitor.cpp
index 02cb1d8785f19b25b19bf44f7a0533156797b7da..305e0b0c3b24faec62f38dde357ca4e1d8b61d46 100644 (file)
@@ -28,7 +28,7 @@
 #include <iostream>
 #include <cstdint>
 
-#include <boost/thread.hpp>
+#include <boost/asio.hpp>
 #include <boost/filesystem/fstream.hpp>
 #include <boost/filesystem/convenience.hpp>
 
@@ -53,13 +53,7 @@ public:
        {
                try
                {
-                       boost::this_thread::interruption_point();
                        handler_(event, file);
-                       boost::this_thread::interruption_point();
-               }
-               catch (const boost::thread_interrupted&)
-               {
-                       throw;
                }
                catch (...)
                {
@@ -93,13 +87,18 @@ public:
        {
        }
 
-       void reemmit_all()
+       void reemmit_all(const tbb::atomic<bool>& running)
        {
                if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
                        return;
 
                for (auto& file : files_)
+               {
+                       if (!running)
+                               return;
+
                        handler_(filesystem_event::MODIFIED, file.first);
+               }
        }
 
        void reemmit(const boost::filesystem::path& file)
@@ -203,73 +202,108 @@ private:
        }
 };
 
-class polling_filesystem_monitor : public filesystem_monitor
+class polling_filesystem_monitor
+               : public filesystem_monitor
+               , public spl::enable_shared_from_this<polling_filesystem_monitor>
 {
-       directory_monitor root_monitor_;
-       boost::thread scanning_thread_;
-       tbb::atomic<bool> running_;
-       int scan_interval_millis_;
-       std::promise<void> initial_scan_completion_;
-       tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
-       tbb::atomic<bool> reemmit_all_;
+       tbb::atomic<bool>                                                               running_;
+       std::shared_ptr<boost::asio::io_service>                scheduler_;
+       directory_monitor                                                               root_monitor_;
+       boost::asio::deadline_timer                                             timer_;
+       int                                                                                             scan_interval_millis_;
+       std::promise<void>                                                              initial_scan_completion_;
+       tbb::concurrent_queue<boost::filesystem::path>  to_reemmit_;
+       tbb::atomic<bool>                                                               reemmit_all_;
+       executor                                                                                executor_;
 public:
        polling_filesystem_monitor(
                        const boost::filesystem::path& folder_to_watch,
                        filesystem_event events_of_interest_mask,
                        bool report_already_existing,
                        int scan_interval_millis,
+                       std::shared_ptr<boost::asio::io_service> scheduler,
                        const filesystem_monitor_handler& handler,
                        const initial_files_handler& initial_files_handler)
-               : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+               : scheduler_(std::move(scheduler))
+               , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+               , timer_(*scheduler_)
                , scan_interval_millis_(scan_interval_millis)
+               , executor_(L"polling_filesystem_monitor")
        {
                running_ = true;
                reemmit_all_ = false;
-               scanning_thread_ = boost::thread([this] { scanner(); });
        }
 
-       virtual ~polling_filesystem_monitor()
+       void start()
+       {
+               executor_.begin_invoke([this]
+               {
+                       scan();
+                       initial_scan_completion_.set_value();
+                       schedule_next();
+               });
+       }
+
+       ~polling_filesystem_monitor()
        {
                running_ = false;
-               scanning_thread_.interrupt();
-               scanning_thread_.join();
+               boost::system::error_code e;
+               timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
        }
 
-       virtual std::future<void> initial_files_processed()
+       std::future<void> initial_files_processed() override
        {
                return initial_scan_completion_.get_future();
        }
 
-       virtual void reemmit_all()
+       void reemmit_all() override
        {
                reemmit_all_ = true;
        }
 
-       virtual void reemmit(const boost::filesystem::path& file)
+       void reemmit(const boost::filesystem::path& file) override
        {
                to_reemmit_.push(file);
        }
 private:
-       void scanner()
+       void schedule_next()
        {
-               ensure_gpf_handler_installed_for_thread("polling_filesystem_monitor");
+               if (!running_)
+                       return;
 
-               bool running = scan(false);
-               initial_scan_completion_.set_value();
+               std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
 
-               if (running)
-                       while (scan(true));
+               timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
+               timer_.async_wait([weak_self](const boost::system::error_code& e)
+               {
+                       auto strong_self = weak_self.lock();
+
+                       if (strong_self)
+                               strong_self->begin_scan();
+               });
        }
 
-       bool scan(bool sleep)
+       void begin_scan()
        {
-               try
+               if (!running_)
+                       return;
+
+               executor_.begin_invoke([this]()
                {
-                       if (sleep)
-                               boost::this_thread::sleep_for(boost::chrono::milliseconds(scan_interval_millis_));
+                       scan();
+                       schedule_next();
+               });
+       }
 
+       void scan()
+       {
+               if (!running_)
+                       return;
+
+               try
+               {
                        if (reemmit_all_.fetch_and_store(false))
-                               root_monitor_.reemmit_all();
+                               root_monitor_.reemmit_all(running_);
                        else
                        {
                                boost::filesystem::path file;
@@ -280,30 +314,31 @@ private:
 
                        root_monitor_.scan([=] { return !running_; });
                }
-               catch (const boost::thread_interrupted&)
-               {
-               }
                catch (...)
                {
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
-
-               return running_;
        }
 };
 
 struct polling_filesystem_monitor_factory::impl
 {
+       std::shared_ptr<boost::asio::io_service> scheduler_;
        int scan_interval_millis;
 
-       impl(int scan_interval_millis)
-               : scan_interval_millis(scan_interval_millis)
+       impl(
+                       std::shared_ptr<boost::asio::io_service> scheduler,
+                       int scan_interval_millis)
+               : scheduler_(std::move(scheduler))
+               , scan_interval_millis(scan_interval_millis)
        {
        }
 };
 
-polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
-       : impl_(new impl(scan_interval_millis))
+polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
+               std::shared_ptr<boost::asio::io_service> scheduler,
+               int scan_interval_millis)
+       : impl_(new impl(std::move(scheduler), scan_interval_millis))
 {
 }
 
@@ -318,13 +353,18 @@ filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
                const filesystem_monitor_handler& handler,
                const initial_files_handler& initial_files_handler)
 {
-       return spl::make_shared<polling_filesystem_monitor>(
+       auto monitor = spl::make_shared<polling_filesystem_monitor>(
                        folder_to_watch,
                        events_of_interest_mask,
                        report_already_existing,
                        impl_->scan_interval_millis,
+                       impl_->scheduler_,
                        handler,
                        initial_files_handler);
+
+       monitor->start();
+
+       return monitor;
 }
 
 }