2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
24 #include "polling_filesystem_monitor.h"
31 #include <boost/asio.hpp>
32 #include <boost/filesystem/fstream.hpp>
33 #include <boost/filesystem/convenience.hpp>
35 #include <tbb/atomic.h>
36 #include <tbb/concurrent_queue.h>
43 class exception_protected_handler
45 filesystem_monitor_handler handler_;
47 exception_protected_handler(const filesystem_monitor_handler& handler)
52 void operator()(filesystem_event event, const boost::filesystem::path& file)
56 handler_(event, file);
60 CASPAR_LOG_CURRENT_EXCEPTION();
65 class directory_monitor
67 bool report_already_existing_;
68 boost::filesystem::path folder_;
69 filesystem_event events_mask_;
70 filesystem_monitor_handler handler_;
71 initial_files_handler initial_files_handler_;
72 bool first_scan_ = true;
73 std::map<boost::filesystem::path, std::time_t> files_;
74 std::map<boost::filesystem::path, uintmax_t> being_written_sizes_;
77 bool report_already_existing,
78 const boost::filesystem::path& folder,
79 filesystem_event events_mask,
80 const filesystem_monitor_handler& handler,
81 const initial_files_handler& initial_files_handler)
82 : report_already_existing_(report_already_existing)
84 , events_mask_(events_mask)
85 , handler_(exception_protected_handler(handler))
86 , initial_files_handler_(initial_files_handler)
90 void reemmit_all(const tbb::atomic<bool>& running)
92 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
95 for (auto& file : files_)
100 handler_(filesystem_event::MODIFIED, file.first);
104 void reemmit(const boost::filesystem::path& file)
106 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
109 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
110 handler_(filesystem_event::MODIFIED, file);
113 void scan(const boost::function<bool ()>& should_abort)
115 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
116 using namespace boost::filesystem;
118 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
119 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
120 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
122 auto filenames = cpplinq::from(files_).select(keys());
123 std::set<path> removed_files(filenames.begin(), filenames.end());
124 std::set<path> initial_files;
126 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
131 auto& path = iter->path();
133 if (is_directory(path))
136 auto now = std::time(nullptr);
137 std::time_t current_mtime;
141 current_mtime = last_write_time(path);
145 // Probably removed, will be captured the next round.
149 auto time_since_written_to = now - current_mtime;
150 bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
151 auto previous_it = files_.find(path);
152 bool already_known = previous_it != files_.end();
154 if (already_known && no_longer_being_written_to)
156 bool modified = previous_it->second != current_mtime;
158 if (modified && can_read_file(path))
160 if (interested_in_modified)
161 handler_(filesystem_event::MODIFIED, path);
163 files_[path] = current_mtime;
164 being_written_sizes_.erase(path);
167 else if (no_longer_being_written_to && can_read_file(path))
169 if (interested_in_created && (report_already_existing_ || !first_scan_))
170 handler_(filesystem_event::CREATED, path);
173 initial_files.insert(path);
175 files_.insert(std::make_pair(path, current_mtime));
176 being_written_sizes_.erase(path);
179 removed_files.erase(path);
182 for (auto& path : removed_files)
185 being_written_sizes_.erase(path);
187 if (interested_in_removed)
188 handler_(filesystem_event::REMOVED, path);
192 initial_files_handler_(initial_files);
197 bool can_read_file(const boost::filesystem::path& file)
199 boost::filesystem::wifstream stream(file);
201 return stream.is_open();
205 class polling_filesystem_monitor
206 : public filesystem_monitor
207 , public spl::enable_shared_from_this<polling_filesystem_monitor>
209 tbb::atomic<bool> running_;
210 std::shared_ptr<boost::asio::io_service> scheduler_;
211 directory_monitor root_monitor_;
212 boost::asio::deadline_timer timer_;
213 int scan_interval_millis_;
214 std::promise<void> initial_scan_completion_;
215 tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
216 tbb::atomic<bool> reemmit_all_;
219 polling_filesystem_monitor(
220 const boost::filesystem::path& folder_to_watch,
221 filesystem_event events_of_interest_mask,
222 bool report_already_existing,
223 int scan_interval_millis,
224 std::shared_ptr<boost::asio::io_service> scheduler,
225 const filesystem_monitor_handler& handler,
226 const initial_files_handler& initial_files_handler)
227 : scheduler_(std::move(scheduler))
228 , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
229 , timer_(*scheduler_)
230 , scan_interval_millis_(scan_interval_millis)
231 , executor_(L"polling_filesystem_monitor")
234 reemmit_all_ = false;
239 executor_.begin_invoke([this]
242 initial_scan_completion_.set_value();
247 ~polling_filesystem_monitor()
250 boost::system::error_code e;
251 timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
254 std::future<void> initial_files_processed() override
256 return initial_scan_completion_.get_future();
259 void reemmit_all() override
264 void reemmit(const boost::filesystem::path& file) override
266 to_reemmit_.push(file);
274 std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
276 timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
277 timer_.async_wait([weak_self](const boost::system::error_code& e)
279 auto strong_self = weak_self.lock();
282 strong_self->begin_scan();
291 executor_.begin_invoke([this]()
305 if (reemmit_all_.fetch_and_store(false))
306 root_monitor_.reemmit_all(running_);
309 boost::filesystem::path file;
311 while (to_reemmit_.try_pop(file))
312 root_monitor_.reemmit(file);
315 root_monitor_.scan([=] { return !running_; });
319 CASPAR_LOG_CURRENT_EXCEPTION();
324 struct polling_filesystem_monitor_factory::impl
326 std::shared_ptr<boost::asio::io_service> scheduler_;
327 int scan_interval_millis;
330 std::shared_ptr<boost::asio::io_service> scheduler,
331 int scan_interval_millis)
332 : scheduler_(std::move(scheduler))
333 , scan_interval_millis(scan_interval_millis)
338 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
339 std::shared_ptr<boost::asio::io_service> scheduler,
340 int scan_interval_millis)
341 : impl_(new impl(std::move(scheduler), scan_interval_millis))
345 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
349 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
350 const boost::filesystem::path& folder_to_watch,
351 filesystem_event events_of_interest_mask,
352 bool report_already_existing,
353 const filesystem_monitor_handler& handler,
354 const initial_files_handler& initial_files_handler)
356 auto monitor = spl::make_shared<polling_filesystem_monitor>(
358 events_of_interest_mask,
359 report_already_existing,
360 impl_->scan_interval_millis,
363 initial_files_handler);