#include <map>
#include <set>
#include <iostream>
+#include <cstdint>
+#include <boost/asio.hpp>
#include <boost/thread.hpp>
-#include <boost/foreach.hpp>
-#include <boost/range/adaptor/map.hpp>
-#include <boost/range/algorithm/copy.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/convenience.hpp>
#include <tbb/concurrent_queue.h>
#include "executor.h"
+#include "linq.h"
namespace caspar {
{
}
- void operator()(filesystem_event event, const boost::filesystem::wpath& file)
+ void operator()(filesystem_event event, const boost::filesystem::path& file)
{
try
{
- boost::this_thread::interruption_point();
handler_(event, file);
- boost::this_thread::interruption_point();
- }
- catch (const boost::thread_interrupted&)
- {
- throw;
}
catch (...)
{
class directory_monitor
{
- bool report_already_existing_;
- boost::filesystem::wpath folder_;
- filesystem_event events_mask_;
- filesystem_monitor_handler handler_;
- initial_files_handler initial_files_handler_;
- bool first_scan_;
- std::map<boost::filesystem::wpath, std::time_t> files_;
- std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;
+ bool report_already_existing_;
+ boost::filesystem::path folder_;
+ filesystem_event events_mask_;
+ filesystem_monitor_handler handler_;
+ initial_files_handler initial_files_handler_;
+ bool first_scan_ = true;
+ std::map<boost::filesystem::path, std::time_t> files_;
+ std::map<boost::filesystem::path, uintmax_t> being_written_sizes_;
public:
directory_monitor(
bool report_already_existing,
- const boost::filesystem::wpath& folder,
+ const boost::filesystem::path& folder,
filesystem_event events_mask,
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
, events_mask_(events_mask)
, handler_(exception_protected_handler(handler))
, initial_files_handler_(initial_files_handler)
- , first_scan_(true)
{
}
- void reemmit_all()
+ void reemmit_all(const tbb::atomic<bool>& running)
{
- if ((events_mask_ & MODIFIED) == 0)
+ if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
return;
- BOOST_FOREACH(auto& file, files_)
- handler_(MODIFIED, file.first);
+ for (auto& file : files_)
+ {
+ if (!running)
+ return;
+
+ handler_(filesystem_event::MODIFIED, file.first);
+ }
}
- void reemmit(const boost::filesystem::wpath& file)
+ void reemmit(const boost::filesystem::path& file)
{
- if ((events_mask_ & MODIFIED) == 0)
+ if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
return;
if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
- handler_(MODIFIED, file);
+ handler_(filesystem_event::MODIFIED, file);
}
void scan(const boost::function<bool ()>& should_abort)
static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
using namespace boost::filesystem;
- bool interested_in_removed = (events_mask_ & REMOVED) > 0;
- bool interested_in_created = (events_mask_ & CREATED) > 0;
- bool interested_in_modified = (events_mask_ & MODIFIED) > 0;
-
- std::set<wpath> removed_files;
- boost::copy(
- files_ | boost::adaptors::map_keys,
- std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));
+ bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
+ bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
+ bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
- std::set<wpath> initial_files;
+ auto filenames = cpplinq::from(files_).select(keys());
+ std::set<path> removed_files(filenames.begin(), filenames.end());
+ std::set<path> initial_files;
- for (boost::filesystem3::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem3::wrecursive_directory_iterator(); ++iter)
+ for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
{
if (should_abort())
return;
if (modified && can_read_file(path))
{
if (interested_in_modified)
- handler_(MODIFIED, path);
+ handler_(filesystem_event::MODIFIED, path);
files_[path] = current_mtime;
being_written_sizes_.erase(path);
else if (no_longer_being_written_to && can_read_file(path))
{
if (interested_in_created && (report_already_existing_ || !first_scan_))
- handler_(CREATED, path);
+ handler_(filesystem_event::CREATED, path);
if (first_scan_)
initial_files.insert(path);
removed_files.erase(path);
}
- BOOST_FOREACH(auto& path, removed_files)
+ for (auto& path : removed_files)
{
files_.erase(path);
being_written_sizes_.erase(path);
if (interested_in_removed)
- handler_(REMOVED, path);
+ handler_(filesystem_event::REMOVED, path);
}
if (first_scan_)
first_scan_ = false;
}
private:
- bool can_read_file(const boost::filesystem::wpath& file)
+ bool can_read_file(const boost::filesystem::path& file)
{
boost::filesystem::wifstream stream(file);
}
};
-class polling_filesystem_monitor : public filesystem_monitor
+class polling_filesystem_monitor
+ : public filesystem_monitor
+ , public spl::enable_shared_from_this<polling_filesystem_monitor>
{
- directory_monitor root_monitor_;
- boost::thread scanning_thread_;
- tbb::atomic<bool> running_;
- int scan_interval_millis_;
- boost::promise<void> initial_scan_completion_;
- tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
- tbb::atomic<bool> reemmit_all_;
+ tbb::atomic<bool> running_;
+ std::shared_ptr<boost::asio::io_service> scheduler_;
+ directory_monitor root_monitor_;
+ boost::asio::deadline_timer timer_;
+ int scan_interval_millis_;
+ std::promise<void> initial_scan_completion_;
+ tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
+ tbb::atomic<bool> reemmit_all_;
+ executor executor_;
public:
polling_filesystem_monitor(
- const boost::filesystem::wpath& folder_to_watch,
+ const boost::filesystem::path& folder_to_watch,
filesystem_event events_of_interest_mask,
bool report_already_existing,
int scan_interval_millis,
+ std::shared_ptr<boost::asio::io_service> scheduler,
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
- : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+ : scheduler_(std::move(scheduler))
+ , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+ , timer_(*scheduler_)
, scan_interval_millis_(scan_interval_millis)
+ , executor_(L"polling_filesystem_monitor")
{
running_ = true;
reemmit_all_ = false;
- scanning_thread_ = boost::thread([this] { scanner(); });
}
- virtual ~polling_filesystem_monitor()
+ void start()
+ {
+ executor_.begin_invoke([this]
+ {
+ scan();
+ initial_scan_completion_.set_value();
+ schedule_next();
+ });
+ }
+
+ ~polling_filesystem_monitor()
{
running_ = false;
- scanning_thread_.interrupt();
- scanning_thread_.join();
+ boost::system::error_code e;
+ timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
}
- virtual boost::unique_future<void> initial_files_processed()
+ std::future<void> initial_files_processed() override
{
return initial_scan_completion_.get_future();
}
- virtual void reemmit_all()
+ void reemmit_all() override
{
reemmit_all_ = true;
}
- virtual void reemmit(const boost::filesystem::wpath& file)
+ void reemmit(const boost::filesystem::path& file) override
{
to_reemmit_.push(file);
}
private:
- void scanner()
+ void schedule_next()
{
- win32_exception::install_handler();
+ if (!running_)
+ return;
- //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
+ std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
- bool running = scan(false);
- initial_scan_completion_.set_value();
+ timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
+ timer_.async_wait([weak_self](const boost::system::error_code& e)
+ {
+ auto strong_self = weak_self.lock();
- if (running)
- while (scan(true));
+ if (strong_self)
+ strong_self->begin_scan();
+ });
}
- bool scan(bool sleep)
+ void begin_scan()
{
- try
+ if (!running_)
+ return;
+
+ executor_.begin_invoke([this]()
{
- if (sleep)
- boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
+ scan();
+ schedule_next();
+ });
+ }
+ void scan()
+ {
+ if (!running_)
+ return;
+
+ try
+ {
if (reemmit_all_.fetch_and_store(false))
- root_monitor_.reemmit_all();
+ root_monitor_.reemmit_all(running_);
else
{
- boost::filesystem::wpath file;
+ boost::filesystem::path file;
while (to_reemmit_.try_pop(file))
root_monitor_.reemmit(file);
root_monitor_.scan([=] { return !running_; });
}
- catch (const boost::thread_interrupted&)
- {
- }
catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
- return running_;
}
};
struct polling_filesystem_monitor_factory::impl
{
+ std::shared_ptr<boost::asio::io_service> scheduler_;
int scan_interval_millis;
- impl(int scan_interval_millis)
- : scan_interval_millis(scan_interval_millis)
+ impl(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis)
+ : scheduler_(std::move(scheduler))
+ , scan_interval_millis(scan_interval_millis)
{
}
};
-polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
- : impl_(new impl(scan_interval_millis))
+polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis)
+ : impl_(new impl(std::move(scheduler), scan_interval_millis))
{
}
}
filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
- const boost::filesystem::wpath& folder_to_watch,
+ const boost::filesystem::path& folder_to_watch,
filesystem_event events_of_interest_mask,
bool report_already_existing,
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
{
- return spl::make_shared<polling_filesystem_monitor>(
+ auto monitor = spl::make_shared<polling_filesystem_monitor>(
folder_to_watch,
events_of_interest_mask,
report_already_existing,
impl_->scan_interval_millis,
+ impl_->scheduler_,
handler,
initial_files_handler);
+
+ monitor->start();
+
+ return monitor;
}
-}
\ No newline at end of file
+}