X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Fpolling_filesystem_monitor.cpp;h=c189218ea31ab0d450ef545009477131c33e241f;hb=cde347942b00b54550497641786219e8a0862287;hp=95e7c97277b9aa94338106ef9e12800a2a443803;hpb=5aee090fd729fd556c6ed8075906fe67a4185e67;p=casparcg diff --git a/common/polling_filesystem_monitor.cpp b/common/polling_filesystem_monitor.cpp index 95e7c9727..c189218ea 100644 --- a/common/polling_filesystem_monitor.cpp +++ b/common/polling_filesystem_monitor.cpp @@ -26,11 +26,10 @@ #include #include #include +#include +#include #include -#include -#include -#include #include #include @@ -38,6 +37,7 @@ #include #include "executor.h" +#include "linq.h" namespace caspar { @@ -50,17 +50,11 @@ public: { } - void operator()(filesystem_event event, const boost::filesystem::wpath& file) + void operator()(filesystem_event event, const boost::filesystem::path& file) { try { - boost::this_thread::interruption_point(); handler_(event, file); - boost::this_thread::interruption_point(); - } - catch (const boost::thread_interrupted&) - { - throw; } catch (...) { @@ -71,18 +65,18 @@ public: class directory_monitor { - bool report_already_existing_; - boost::filesystem::wpath folder_; - filesystem_event events_mask_; - filesystem_monitor_handler handler_; - initial_files_handler initial_files_handler_; - bool first_scan_; - std::map files_; - std::map being_written_sizes_; + bool report_already_existing_; + boost::filesystem::path folder_; + filesystem_event events_mask_; + filesystem_monitor_handler handler_; + initial_files_handler initial_files_handler_; + bool first_scan_ = true; + std::map files_; + std::map being_written_sizes_; public: directory_monitor( bool report_already_existing, - const boost::filesystem::wpath& folder, + const boost::filesystem::path& folder, filesystem_event events_mask, const filesystem_monitor_handler& handler, const initial_files_handler& initial_files_handler) @@ -91,26 +85,30 @@ public: , events_mask_(events_mask) , handler_(exception_protected_handler(handler)) , initial_files_handler_(initial_files_handler) - , first_scan_(true) { } - void reemmit_all() + void reemmit_all(const tbb::atomic& running) { - if ((events_mask_ & MODIFIED) == 0) + if (static_cast(events_mask_ & filesystem_event::MODIFIED) == 0) return; - BOOST_FOREACH(auto& file, files_) - handler_(MODIFIED, file.first); + for (auto& file : files_) + { + if (!running) + return; + + handler_(filesystem_event::MODIFIED, file.first); + } } - void reemmit(const boost::filesystem::wpath& file) + void reemmit(const boost::filesystem::path& file) { - if ((events_mask_ & MODIFIED) == 0) + if (static_cast(events_mask_ & filesystem_event::MODIFIED) == 0) return; if (files_.find(file) != files_.end() && boost::filesystem::exists(file)) - handler_(MODIFIED, file); + handler_(filesystem_event::MODIFIED, file); } void scan(const boost::function& should_abort) @@ -118,18 +116,15 @@ public: static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds using namespace boost::filesystem; - bool interested_in_removed = (events_mask_ & REMOVED) > 0; - bool interested_in_created = (events_mask_ & CREATED) > 0; - bool interested_in_modified = (events_mask_ & MODIFIED) > 0; - - std::set removed_files; - boost::copy( - files_ | boost::adaptors::map_keys, - std::insert_iterator(removed_files, removed_files.end())); + bool interested_in_removed = static_cast(events_mask_ & filesystem_event::REMOVED) > 0; + bool interested_in_created = static_cast(events_mask_ & filesystem_event::CREATED) > 0; + bool interested_in_modified = static_cast(events_mask_ & filesystem_event::MODIFIED) > 0; - std::set initial_files; + auto filenames = cpplinq::from(files_).select(keys()); + std::set removed_files(filenames.begin(), filenames.end()); + std::set initial_files; - for (boost::filesystem3::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem3::wrecursive_directory_iterator(); ++iter) + for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter) { if (should_abort()) return; @@ -164,7 +159,7 @@ public: if (modified && can_read_file(path)) { if (interested_in_modified) - handler_(MODIFIED, path); + handler_(filesystem_event::MODIFIED, path); files_[path] = current_mtime; being_written_sizes_.erase(path); @@ -173,7 +168,7 @@ public: else if (no_longer_being_written_to && can_read_file(path)) { if (interested_in_created && (report_already_existing_ || !first_scan_)) - handler_(CREATED, path); + handler_(filesystem_event::CREATED, path); if (first_scan_) initial_files.insert(path); @@ -185,13 +180,13 @@ public: removed_files.erase(path); } - BOOST_FOREACH(auto& path, removed_files) + for (auto& path : removed_files) { files_.erase(path); being_written_sizes_.erase(path); if (interested_in_removed) - handler_(REMOVED, path); + handler_(filesystem_event::REMOVED, path); } if (first_scan_) @@ -200,7 +195,7 @@ public: first_scan_ = false; } private: - bool can_read_file(const boost::filesystem::wpath& file) + bool can_read_file(const boost::filesystem::path& file) { boost::filesystem::wifstream stream(file); @@ -210,76 +205,99 @@ private: class polling_filesystem_monitor : public filesystem_monitor { - directory_monitor root_monitor_; - boost::thread scanning_thread_; tbb::atomic running_; + std::shared_ptr scheduler_; + directory_monitor root_monitor_; + boost::asio::deadline_timer timer_; int scan_interval_millis_; - boost::promise initial_scan_completion_; - tbb::concurrent_queue to_reemmit_; + std::promise initial_scan_completion_; + tbb::concurrent_queue to_reemmit_; tbb::atomic reemmit_all_; + executor executor_; public: polling_filesystem_monitor( - const boost::filesystem::wpath& folder_to_watch, + const boost::filesystem::path& folder_to_watch, filesystem_event events_of_interest_mask, bool report_already_existing, int scan_interval_millis, + std::shared_ptr scheduler, const filesystem_monitor_handler& handler, const initial_files_handler& initial_files_handler) - : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler) + : scheduler_(std::move(scheduler)) + , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler) + , timer_(*scheduler_) , scan_interval_millis_(scan_interval_millis) + , executor_(L"polling_filesystem_monitor") { running_ = true; reemmit_all_ = false; - scanning_thread_ = boost::thread([this] { scanner(); }); + executor_.begin_invoke([this] + { + scan(); + initial_scan_completion_.set_value(); + schedule_next(); + }); } - virtual ~polling_filesystem_monitor() + ~polling_filesystem_monitor() { running_ = false; - scanning_thread_.interrupt(); - scanning_thread_.join(); + boost::system::error_code e; + timer_.cancel(e); } - virtual boost::unique_future initial_files_processed() + std::future initial_files_processed() override { return initial_scan_completion_.get_future(); } - virtual void reemmit_all() + void reemmit_all() override { reemmit_all_ = true; } - virtual void reemmit(const boost::filesystem::wpath& file) + void reemmit(const boost::filesystem::path& file) override { to_reemmit_.push(file); } private: - void scanner() + void schedule_next() { - win32_exception::install_handler(); + if (!running_) + return; - //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor"); + timer_.expires_from_now( + boost::posix_time::milliseconds(scan_interval_millis_)); + timer_.async_wait([this](const boost::system::error_code& e) + { + begin_scan(); + }); + } - bool running = scan(false); - initial_scan_completion_.set_value(); + void begin_scan() + { + if (!running_) + return; - if (running) - while (scan(true)); + executor_.begin_invoke([this]() + { + scan(); + schedule_next(); + }); } - bool scan(bool sleep) + void scan() { + if (!running_) + return; + try { - if (sleep) - boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_)); - if (reemmit_all_.fetch_and_store(false)) - root_monitor_.reemmit_all(); + root_monitor_.reemmit_all(running_); else { - boost::filesystem::wpath file; + boost::filesystem::path file; while (to_reemmit_.try_pop(file)) root_monitor_.reemmit(file); @@ -287,30 +305,31 @@ private: root_monitor_.scan([=] { return !running_; }); } - catch (const boost::thread_interrupted&) - { - } catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); } - - return running_; } }; struct polling_filesystem_monitor_factory::impl { + std::shared_ptr scheduler_; int scan_interval_millis; - impl(int scan_interval_millis) - : scan_interval_millis(scan_interval_millis) + impl( + std::shared_ptr scheduler, + int scan_interval_millis) + : scheduler_(std::move(scheduler)) + , scan_interval_millis(scan_interval_millis) { } }; -polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis) - : impl_(new impl(scan_interval_millis)) +polling_filesystem_monitor_factory::polling_filesystem_monitor_factory( + std::shared_ptr scheduler, + int scan_interval_millis) + : impl_(new impl(std::move(scheduler), scan_interval_millis)) { } @@ -319,7 +338,7 @@ polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory() } filesystem_monitor::ptr polling_filesystem_monitor_factory::create( - const boost::filesystem::wpath& folder_to_watch, + const boost::filesystem::path& folder_to_watch, filesystem_event events_of_interest_mask, bool report_already_existing, const filesystem_monitor_handler& handler, @@ -330,8 +349,9 @@ filesystem_monitor::ptr polling_filesystem_monitor_factory::create( events_of_interest_mask, report_already_existing, impl_->scan_interval_millis, + impl_->scheduler_, handler, initial_files_handler); } -} \ No newline at end of file +}