#include <cstdint>
#include <boost/asio.hpp>
-#include <boost/thread.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/convenience.hpp>
{
}
- void reemmit_all()
+ void reemmit_all(const tbb::atomic<bool>& running)
{
if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
return;
for (auto& file : files_)
+ {
+ if (!running)
+ return;
+
handler_(filesystem_event::MODIFIED, file.first);
+ }
}
void reemmit(const boost::filesystem::path& file)
}
};
-class polling_filesystem_monitor : public filesystem_monitor
+class polling_filesystem_monitor
+ : public filesystem_monitor
+ , public spl::enable_shared_from_this<polling_filesystem_monitor>
{
- tbb::atomic<bool> running_;
- std::shared_ptr<boost::asio::io_service> scheduler_;
- directory_monitor root_monitor_;
- boost::asio::deadline_timer timer_;
- int scan_interval_millis_;
- std::promise<void> initial_scan_completion_;
- tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
- tbb::atomic<bool> reemmit_all_;
- executor executor_;
+ tbb::atomic<bool> running_;
+ std::shared_ptr<boost::asio::io_service> scheduler_;
+ directory_monitor root_monitor_;
+ boost::asio::deadline_timer timer_;
+ int scan_interval_millis_;
+ std::promise<void> initial_scan_completion_;
+ tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
+ tbb::atomic<bool> reemmit_all_;
+ executor executor_;
public:
polling_filesystem_monitor(
const boost::filesystem::path& folder_to_watch,
{
running_ = true;
reemmit_all_ = false;
+ }
+
+ void start()
+ {
executor_.begin_invoke([this]
{
scan();
{
running_ = false;
boost::system::error_code e;
- timer_.cancel(e);
+ timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
}
std::future<void> initial_files_processed() override
if (!running_)
return;
- timer_.expires_from_now(
- boost::posix_time::milliseconds(scan_interval_millis_));
- timer_.async_wait([this](const boost::system::error_code& e)
+ std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
+
+ timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
+ timer_.async_wait([weak_self](const boost::system::error_code& e)
{
- begin_scan();
+ auto strong_self = weak_self.lock();
+
+ if (strong_self)
+ strong_self->begin_scan();
});
}
try
{
if (reemmit_all_.fetch_and_store(false))
- root_monitor_.reemmit_all();
+ root_monitor_.reemmit_all(running_);
else
{
- boost::filesystem::wpath file;
+ boost::filesystem::path file;
while (to_reemmit_.try_pop(file))
root_monitor_.reemmit(file);
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
{
- return spl::make_shared<polling_filesystem_monitor>(
+ auto monitor = spl::make_shared<polling_filesystem_monitor>(
folder_to_watch,
events_of_interest_mask,
report_already_existing,
impl_->scheduler_,
handler,
initial_files_handler);
+
+ monitor->start();
+
+ return monitor;
}
}