#include <iostream>
#include <cstdint>
+#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/convenience.hpp>
{
}
- void operator()(filesystem_event event, const boost::filesystem::wpath& file)
+ void operator()(filesystem_event event, const boost::filesystem::path& file)
{
try
{
- boost::this_thread::interruption_point();
handler_(event, file);
- boost::this_thread::interruption_point();
- }
- catch (const boost::thread_interrupted&)
- {
- throw;
}
catch (...)
{
class directory_monitor
{
bool report_already_existing_;
- boost::filesystem::wpath folder_;
+ boost::filesystem::path folder_;
filesystem_event events_mask_;
filesystem_monitor_handler handler_;
initial_files_handler initial_files_handler_;
bool first_scan_ = true;
- std::map<boost::filesystem::wpath, std::time_t> files_;
- std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;
+ std::map<boost::filesystem::path, std::time_t> files_;
+ std::map<boost::filesystem::path, uintmax_t> being_written_sizes_;
public:
directory_monitor(
bool report_already_existing,
- const boost::filesystem::wpath& folder,
+ const boost::filesystem::path& folder,
filesystem_event events_mask,
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
{
}
- void reemmit_all()
+ void reemmit_all(const tbb::atomic<bool>& running)
{
if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
return;
for (auto& file : files_)
+ {
+ if (!running)
+ return;
+
handler_(filesystem_event::MODIFIED, file.first);
+ }
}
- void reemmit(const boost::filesystem::wpath& file)
+ void reemmit(const boost::filesystem::path& file)
{
if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
return;
bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
auto filenames = cpplinq::from(files_).select(keys());
- std::set<wpath> removed_files(filenames.begin(), filenames.end());
- std::set<wpath> initial_files;
+ std::set<path> removed_files(filenames.begin(), filenames.end());
+ std::set<path> initial_files;
for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
{
first_scan_ = false;
}
private:
- bool can_read_file(const boost::filesystem::wpath& file)
+ bool can_read_file(const boost::filesystem::path& file)
{
boost::filesystem::wifstream stream(file);
class polling_filesystem_monitor : public filesystem_monitor
{
- directory_monitor root_monitor_;
- boost::thread scanning_thread_;
tbb::atomic<bool> running_;
+ std::shared_ptr<boost::asio::io_service> scheduler_;
+ directory_monitor root_monitor_;
+ boost::asio::deadline_timer timer_;
int scan_interval_millis_;
std::promise<void> initial_scan_completion_;
- tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
+ tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
tbb::atomic<bool> reemmit_all_;
+ executor executor_;
public:
polling_filesystem_monitor(
- const boost::filesystem::wpath& folder_to_watch,
+ const boost::filesystem::path& folder_to_watch,
filesystem_event events_of_interest_mask,
bool report_already_existing,
int scan_interval_millis,
+ std::shared_ptr<boost::asio::io_service> scheduler,
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
- : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+ : scheduler_(std::move(scheduler))
+ , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+ , timer_(*scheduler_)
, scan_interval_millis_(scan_interval_millis)
+ , executor_(L"polling_filesystem_monitor")
{
running_ = true;
reemmit_all_ = false;
- scanning_thread_ = boost::thread([this] { scanner(); });
+ executor_.begin_invoke([this]
+ {
+ scan();
+ initial_scan_completion_.set_value();
+ schedule_next();
+ });
}
- virtual ~polling_filesystem_monitor()
+ ~polling_filesystem_monitor()
{
running_ = false;
- scanning_thread_.interrupt();
- scanning_thread_.join();
+ boost::system::error_code e;
+ timer_.cancel(e);
}
- virtual std::future<void> initial_files_processed()
+ std::future<void> initial_files_processed() override
{
return initial_scan_completion_.get_future();
}
- virtual void reemmit_all()
+ void reemmit_all() override
{
reemmit_all_ = true;
}
- virtual void reemmit(const boost::filesystem::wpath& file)
+ void reemmit(const boost::filesystem::path& file) override
{
to_reemmit_.push(file);
}
private:
- void scanner()
+ void schedule_next()
{
- win32_exception::install_handler();
+ if (!running_)
+ return;
- //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
+ timer_.expires_from_now(
+ boost::posix_time::milliseconds(scan_interval_millis_));
+ timer_.async_wait([this](const boost::system::error_code& e)
+ {
+ begin_scan();
+ });
+ }
- bool running = scan(false);
- initial_scan_completion_.set_value();
+ void begin_scan()
+ {
+ if (!running_)
+ return;
- if (running)
- while (scan(true));
+ executor_.begin_invoke([this]()
+ {
+ scan();
+ schedule_next();
+ });
}
- bool scan(bool sleep)
+ void scan()
{
+ if (!running_)
+ return;
+
try
{
- if (sleep)
- boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
-
if (reemmit_all_.fetch_and_store(false))
- root_monitor_.reemmit_all();
+ root_monitor_.reemmit_all(running_);
else
{
- boost::filesystem::wpath file;
+ boost::filesystem::path file;
while (to_reemmit_.try_pop(file))
root_monitor_.reemmit(file);
root_monitor_.scan([=] { return !running_; });
}
- catch (const boost::thread_interrupted&)
- {
- }
catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
- return running_;
}
};
struct polling_filesystem_monitor_factory::impl
{
+ std::shared_ptr<boost::asio::io_service> scheduler_;
int scan_interval_millis;
- impl(int scan_interval_millis)
- : scan_interval_millis(scan_interval_millis)
+ impl(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis)
+ : scheduler_(std::move(scheduler))
+ , scan_interval_millis(scan_interval_millis)
{
}
};
-polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
- : impl_(new impl(scan_interval_millis))
+polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis)
+ : impl_(new impl(std::move(scheduler), scan_interval_millis))
{
}
}
filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
- const boost::filesystem::wpath& folder_to_watch,
+ const boost::filesystem::path& folder_to_watch,
filesystem_event events_of_interest_mask,
bool report_already_existing,
const filesystem_monitor_handler& handler,
events_of_interest_mask,
report_already_existing,
impl_->scan_interval_millis,
+ impl_->scheduler_,
handler,
initial_files_handler);
}
-}
\ No newline at end of file
+}