2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
24 #include "polling_filesystem_monitor.h"
31 #include <boost/thread.hpp>
32 #include <boost/range/adaptor/map.hpp>
33 #include <boost/range/algorithm/copy.hpp>
34 #include <boost/filesystem/fstream.hpp>
35 #include <boost/filesystem/convenience.hpp>
37 #include <tbb/atomic.h>
38 #include <tbb/concurrent_queue.h>
44 class exception_protected_handler
46 filesystem_monitor_handler handler_;
48 exception_protected_handler(const filesystem_monitor_handler& handler)
53 void operator()(filesystem_event event, const boost::filesystem::wpath& file)
57 boost::this_thread::interruption_point();
58 handler_(event, file);
59 boost::this_thread::interruption_point();
61 catch (const boost::thread_interrupted&)
67 CASPAR_LOG_CURRENT_EXCEPTION();
72 class directory_monitor
74 bool report_already_existing_;
75 boost::filesystem::wpath folder_;
76 filesystem_event events_mask_;
77 filesystem_monitor_handler handler_;
78 initial_files_handler initial_files_handler_;
79 bool first_scan_ = true;
80 std::map<boost::filesystem::wpath, std::time_t> files_;
81 std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;
84 bool report_already_existing,
85 const boost::filesystem::wpath& folder,
86 filesystem_event events_mask,
87 const filesystem_monitor_handler& handler,
88 const initial_files_handler& initial_files_handler)
89 : report_already_existing_(report_already_existing)
91 , events_mask_(events_mask)
92 , handler_(exception_protected_handler(handler))
93 , initial_files_handler_(initial_files_handler)
99 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
102 for (auto& file : files_)
103 handler_(filesystem_event::MODIFIED, file.first);
106 void reemmit(const boost::filesystem::wpath& file)
108 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
111 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
112 handler_(filesystem_event::MODIFIED, file);
115 void scan(const boost::function<bool ()>& should_abort)
117 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
118 using namespace boost::filesystem;
120 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
121 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
122 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
124 std::set<wpath> removed_files;
126 files_ | boost::adaptors::map_keys,
127 std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));
129 std::set<wpath> initial_files;
131 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
136 auto& path = iter->path();
138 if (is_directory(path))
141 auto now = std::time(nullptr);
142 std::time_t current_mtime;
146 current_mtime = last_write_time(path);
150 // Probably removed, will be captured the next round.
154 auto time_since_written_to = now - current_mtime;
155 bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
156 auto previous_it = files_.find(path);
157 bool already_known = previous_it != files_.end();
159 if (already_known && no_longer_being_written_to)
161 bool modified = previous_it->second != current_mtime;
163 if (modified && can_read_file(path))
165 if (interested_in_modified)
166 handler_(filesystem_event::MODIFIED, path);
168 files_[path] = current_mtime;
169 being_written_sizes_.erase(path);
172 else if (no_longer_being_written_to && can_read_file(path))
174 if (interested_in_created && (report_already_existing_ || !first_scan_))
175 handler_(filesystem_event::CREATED, path);
178 initial_files.insert(path);
180 files_.insert(std::make_pair(path, current_mtime));
181 being_written_sizes_.erase(path);
184 removed_files.erase(path);
187 for (auto& path : removed_files)
190 being_written_sizes_.erase(path);
192 if (interested_in_removed)
193 handler_(filesystem_event::REMOVED, path);
197 initial_files_handler_(initial_files);
202 bool can_read_file(const boost::filesystem::wpath& file)
204 boost::filesystem::wifstream stream(file);
206 return stream.is_open();
210 class polling_filesystem_monitor : public filesystem_monitor
212 directory_monitor root_monitor_;
213 boost::thread scanning_thread_;
214 tbb::atomic<bool> running_;
215 int scan_interval_millis_;
216 std::promise<void> initial_scan_completion_;
217 tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
218 tbb::atomic<bool> reemmit_all_;
220 polling_filesystem_monitor(
221 const boost::filesystem::wpath& folder_to_watch,
222 filesystem_event events_of_interest_mask,
223 bool report_already_existing,
224 int scan_interval_millis,
225 const filesystem_monitor_handler& handler,
226 const initial_files_handler& initial_files_handler)
227 : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
228 , scan_interval_millis_(scan_interval_millis)
231 reemmit_all_ = false;
232 scanning_thread_ = boost::thread([this] { scanner(); });
235 virtual ~polling_filesystem_monitor()
238 scanning_thread_.interrupt();
239 scanning_thread_.join();
242 virtual std::future<void> initial_files_processed()
244 return initial_scan_completion_.get_future();
247 virtual void reemmit_all()
252 virtual void reemmit(const boost::filesystem::wpath& file)
254 to_reemmit_.push(file);
259 win32_exception::install_handler();
261 //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
263 bool running = scan(false);
264 initial_scan_completion_.set_value();
270 bool scan(bool sleep)
275 boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
277 if (reemmit_all_.fetch_and_store(false))
278 root_monitor_.reemmit_all();
281 boost::filesystem::wpath file;
283 while (to_reemmit_.try_pop(file))
284 root_monitor_.reemmit(file);
287 root_monitor_.scan([=] { return !running_; });
289 catch (const boost::thread_interrupted&)
294 CASPAR_LOG_CURRENT_EXCEPTION();
301 struct polling_filesystem_monitor_factory::impl
303 int scan_interval_millis;
305 impl(int scan_interval_millis)
306 : scan_interval_millis(scan_interval_millis)
311 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
312 : impl_(new impl(scan_interval_millis))
316 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
320 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
321 const boost::filesystem::wpath& folder_to_watch,
322 filesystem_event events_of_interest_mask,
323 bool report_already_existing,
324 const filesystem_monitor_handler& handler,
325 const initial_files_handler& initial_files_handler)
327 return spl::make_shared<polling_filesystem_monitor>(
329 events_of_interest_mask,
330 report_already_existing,
331 impl_->scan_interval_millis,
333 initial_files_handler);