]> git.sesse.net Git - casparcg/blobdiff - common/polling_filesystem_monitor.cpp
Fix a few Clang warnings.
[casparcg] / common / polling_filesystem_monitor.cpp
index ccef76741c2ce0affefe56e2d75b4a0bff48ee59..305e0b0c3b24faec62f38dde357ca4e1d8b61d46 100644 (file)
@@ -28,9 +28,7 @@
 #include <iostream>
 #include <cstdint>
 
-#include <boost/thread.hpp>
-#include <boost/range/adaptor/map.hpp>
-#include <boost/range/algorithm/copy.hpp>
+#include <boost/asio.hpp>
 #include <boost/filesystem/fstream.hpp>
 #include <boost/filesystem/convenience.hpp>
 
@@ -38,6 +36,7 @@
 #include <tbb/concurrent_queue.h>
 
 #include "executor.h"
+#include "linq.h"
 
 namespace caspar {
 
@@ -50,17 +49,11 @@ public:
        {
        }
 
-       void operator()(filesystem_event event, const boost::filesystem::wpath& file)
+       void operator()(filesystem_event event, const boost::filesystem::path& file)
        {
                try
                {
-                       boost::this_thread::interruption_point();
                        handler_(event, file);
-                       boost::this_thread::interruption_point();
-               }
-               catch (const boost::thread_interrupted&)
-               {
-                       throw;
                }
                catch (...)
                {
@@ -71,18 +64,18 @@ public:
 
 class directory_monitor
 {
-       bool report_already_existing_;
-       boost::filesystem::wpath folder_;
-       filesystem_event events_mask_;
-       filesystem_monitor_handler handler_;
-       initial_files_handler initial_files_handler_;
-       bool first_scan_;
-       std::map<boost::filesystem::wpath, std::time_t> files_;
-       std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;
+       bool                                                                                    report_already_existing_;
+       boost::filesystem::path                                                 folder_;
+       filesystem_event                                                                events_mask_;
+       filesystem_monitor_handler                                              handler_;
+       initial_files_handler                                                   initial_files_handler_;
+       bool                                                                                    first_scan_                                     = true;
+       std::map<boost::filesystem::path, std::time_t>  files_;
+       std::map<boost::filesystem::path, uintmax_t>    being_written_sizes_;
 public:
        directory_monitor(
                        bool report_already_existing,
-                       const boost::filesystem::wpath& folder,
+                       const boost::filesystem::path& folder,
                        filesystem_event events_mask,
                        const filesystem_monitor_handler& handler,
                        const initial_files_handler& initial_files_handler)
@@ -91,20 +84,24 @@ public:
                , events_mask_(events_mask)
                , handler_(exception_protected_handler(handler))
                , initial_files_handler_(initial_files_handler)
-               , first_scan_(true)
        {
        }
 
-       void reemmit_all()
+       void reemmit_all(const tbb::atomic<bool>& running)
        {
                if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
                        return;
 
                for (auto& file : files_)
+               {
+                       if (!running)
+                               return;
+
                        handler_(filesystem_event::MODIFIED, file.first);
+               }
        }
 
-       void reemmit(const boost::filesystem::wpath& file)
+       void reemmit(const boost::filesystem::path& file)
        {
                if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
                        return;
@@ -122,12 +119,9 @@ public:
                bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
                bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
 
-               std::set<wpath> removed_files;
-               boost::copy(
-                               files_ | boost::adaptors::map_keys,
-                               std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));
-
-               std::set<wpath> initial_files;
+               auto filenames = cpplinq::from(files_).select(keys());
+               std::set<path> removed_files(filenames.begin(), filenames.end());
+               std::set<path> initial_files;
 
                for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
                {
@@ -200,7 +194,7 @@ public:
                first_scan_ = false;
        }
 private:
-       bool can_read_file(const boost::filesystem::wpath& file)
+       bool can_read_file(const boost::filesystem::path& file)
        {
                boost::filesystem::wifstream stream(file);
 
@@ -208,78 +202,111 @@ private:
        }
 };
 
-class polling_filesystem_monitor : public filesystem_monitor
+class polling_filesystem_monitor
+               : public filesystem_monitor
+               , public spl::enable_shared_from_this<polling_filesystem_monitor>
 {
-       directory_monitor root_monitor_;
-       boost::thread scanning_thread_;
-       tbb::atomic<bool> running_;
-       int scan_interval_millis_;
-       std::promise<void> initial_scan_completion_;
-       tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
-       tbb::atomic<bool> reemmit_all_;
+       tbb::atomic<bool>                                                               running_;
+       std::shared_ptr<boost::asio::io_service>                scheduler_;
+       directory_monitor                                                               root_monitor_;
+       boost::asio::deadline_timer                                             timer_;
+       int                                                                                             scan_interval_millis_;
+       std::promise<void>                                                              initial_scan_completion_;
+       tbb::concurrent_queue<boost::filesystem::path>  to_reemmit_;
+       tbb::atomic<bool>                                                               reemmit_all_;
+       executor                                                                                executor_;
 public:
        polling_filesystem_monitor(
-                       const boost::filesystem::wpath& folder_to_watch,
+                       const boost::filesystem::path& folder_to_watch,
                        filesystem_event events_of_interest_mask,
                        bool report_already_existing,
                        int scan_interval_millis,
+                       std::shared_ptr<boost::asio::io_service> scheduler,
                        const filesystem_monitor_handler& handler,
                        const initial_files_handler& initial_files_handler)
-               : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+               : scheduler_(std::move(scheduler))
+               , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+               , timer_(*scheduler_)
                , scan_interval_millis_(scan_interval_millis)
+               , executor_(L"polling_filesystem_monitor")
        {
                running_ = true;
                reemmit_all_ = false;
-               scanning_thread_ = boost::thread([this] { scanner(); });
        }
 
-       virtual ~polling_filesystem_monitor()
+       void start()
+       {
+               executor_.begin_invoke([this]
+               {
+                       scan();
+                       initial_scan_completion_.set_value();
+                       schedule_next();
+               });
+       }
+
+       ~polling_filesystem_monitor()
        {
                running_ = false;
-               scanning_thread_.interrupt();
-               scanning_thread_.join();
+               boost::system::error_code e;
+               timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
        }
 
-       virtual std::future<void> initial_files_processed()
+       std::future<void> initial_files_processed() override
        {
                return initial_scan_completion_.get_future();
        }
 
-       virtual void reemmit_all()
+       void reemmit_all() override
        {
                reemmit_all_ = true;
        }
 
-       virtual void reemmit(const boost::filesystem::wpath& file)
+       void reemmit(const boost::filesystem::path& file) override
        {
                to_reemmit_.push(file);
        }
 private:
-       void scanner()
+       void schedule_next()
        {
-               win32_exception::install_handler();
+               if (!running_)
+                       return;
 
-               //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
+               std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
 
-               bool running = scan(false);
-               initial_scan_completion_.set_value();
+               timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
+               timer_.async_wait([weak_self](const boost::system::error_code& e)
+               {
+                       auto strong_self = weak_self.lock();
 
-               if (running)
-                       while (scan(true));
+                       if (strong_self)
+                               strong_self->begin_scan();
+               });
        }
 
-       bool scan(bool sleep)
+       void begin_scan()
        {
-               try
+               if (!running_)
+                       return;
+
+               executor_.begin_invoke([this]()
                {
-                       if (sleep)
-                               boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
+                       scan();
+                       schedule_next();
+               });
+       }
 
+       void scan()
+       {
+               if (!running_)
+                       return;
+
+               try
+               {
                        if (reemmit_all_.fetch_and_store(false))
-                               root_monitor_.reemmit_all();
+                               root_monitor_.reemmit_all(running_);
                        else
                        {
-                               boost::filesystem::wpath file;
+                               boost::filesystem::path file;
 
                                while (to_reemmit_.try_pop(file))
                                        root_monitor_.reemmit(file);
@@ -287,30 +314,31 @@ private:
 
                        root_monitor_.scan([=] { return !running_; });
                }
-               catch (const boost::thread_interrupted&)
-               {
-               }
                catch (...)
                {
                        CASPAR_LOG_CURRENT_EXCEPTION();
                }
-
-               return running_;
        }
 };
 
 struct polling_filesystem_monitor_factory::impl
 {
+       std::shared_ptr<boost::asio::io_service> scheduler_;
        int scan_interval_millis;
 
-       impl(int scan_interval_millis)
-               : scan_interval_millis(scan_interval_millis)
+       impl(
+                       std::shared_ptr<boost::asio::io_service> scheduler,
+                       int scan_interval_millis)
+               : scheduler_(std::move(scheduler))
+               , scan_interval_millis(scan_interval_millis)
        {
        }
 };
 
-polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
-       : impl_(new impl(scan_interval_millis))
+polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
+               std::shared_ptr<boost::asio::io_service> scheduler,
+               int scan_interval_millis)
+       : impl_(new impl(std::move(scheduler), scan_interval_millis))
 {
 }
 
@@ -319,19 +347,24 @@ polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
 }
 
 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
-               const boost::filesystem::wpath& folder_to_watch,
+               const boost::filesystem::path& folder_to_watch,
                filesystem_event events_of_interest_mask,
                bool report_already_existing,
                const filesystem_monitor_handler& handler,
                const initial_files_handler& initial_files_handler)
 {
-       return spl::make_shared<polling_filesystem_monitor>(
+       auto monitor = spl::make_shared<polling_filesystem_monitor>(
                        folder_to_watch,
                        events_of_interest_mask,
                        report_already_existing,
                        impl_->scan_interval_millis,
+                       impl_->scheduler_,
                        handler,
                        initial_files_handler);
+
+       monitor->start();
+
+       return monitor;
 }
 
-}
\ No newline at end of file
+}