2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
\r
4 * This file is part of CasparCG (www.casparcg.com).
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
19 * Author: Helge Norberg, helge.norberg@svt.se
\r
22 #include "../stdafx.h"
\r
24 #include "polling_filesystem_monitor.h"
\r
30 #include <boost/asio.hpp>
\r
31 #include <boost/foreach.hpp>
\r
32 #include <boost/range/adaptor/map.hpp>
\r
33 #include <boost/range/algorithm/copy.hpp>
\r
34 #include <boost/filesystem/fstream.hpp>
\r
36 #include <tbb/atomic.h>
\r
37 #include <tbb/concurrent_queue.h>
\r
39 #include "../concurrency/executor.h"
\r
43 class exception_protected_handler
\r
45 filesystem_monitor_handler handler_;
\r
47 exception_protected_handler(const filesystem_monitor_handler& handler)
\r
52 void operator()(filesystem_event event, const boost::filesystem::wpath& file)
\r
56 handler_(event, file);
\r
60 CASPAR_LOG_CURRENT_EXCEPTION();
\r
65 class directory_monitor
\r
67 bool report_already_existing_;
\r
68 boost::filesystem::wpath folder_;
\r
69 filesystem_event events_mask_;
\r
70 filesystem_monitor_handler handler_;
\r
71 initial_files_handler initial_files_handler_;
\r
73 std::map<boost::filesystem::wpath, std::time_t> files_;
\r
74 std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;
\r
77 bool report_already_existing,
\r
78 const boost::filesystem::wpath& folder,
\r
79 filesystem_event events_mask,
\r
80 const filesystem_monitor_handler& handler,
\r
81 const initial_files_handler& initial_files_handler)
\r
82 : report_already_existing_(report_already_existing)
\r
84 , events_mask_(events_mask)
\r
85 , handler_(exception_protected_handler(handler))
\r
86 , initial_files_handler_(initial_files_handler)
\r
93 if ((events_mask_ & MODIFIED) == 0)
\r
96 BOOST_FOREACH(auto& file, files_)
\r
97 handler_(MODIFIED, file.first);
\r
100 void reemmit(const boost::filesystem::wpath& file)
\r
102 if ((events_mask_ & MODIFIED) == 0)
\r
105 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
\r
106 handler_(MODIFIED, file);
\r
109 void scan(const boost::function<bool ()>& should_abort)
\r
111 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
\r
112 using namespace boost::filesystem;
\r
114 bool interested_in_removed = (events_mask_ & REMOVED) > 0;
\r
115 bool interested_in_created = (events_mask_ & CREATED) > 0;
\r
116 bool interested_in_modified = (events_mask_ & MODIFIED) > 0;
\r
118 std::set<wpath> removed_files;
\r
120 files_ | boost::adaptors::map_keys,
\r
121 std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));
\r
123 std::set<wpath> initial_files;
\r
125 for (wrecursive_directory_iterator iter(folder_); iter != wrecursive_directory_iterator(); ++iter)
\r
127 if (should_abort())
\r
130 auto& path = iter->path();
\r
132 if (is_directory(path))
\r
135 auto now = std::time(nullptr);
\r
136 std::time_t current_mtime;
\r
140 current_mtime = last_write_time(path);
\r
144 // Probably removed, will be captured the next round.
\r
148 auto time_since_written_to = now - current_mtime;
\r
149 bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
\r
150 auto previous_it = files_.find(path);
\r
151 bool already_known = previous_it != files_.end();
\r
153 if (already_known && no_longer_being_written_to)
\r
155 bool modified = previous_it->second != current_mtime;
\r
157 if (modified && can_read_file(path))
\r
159 if (interested_in_modified)
\r
160 handler_(MODIFIED, path);
\r
162 files_[path] = current_mtime;
\r
163 being_written_sizes_.erase(path);
\r
166 else if (no_longer_being_written_to && can_read_file(path))
\r
168 if (interested_in_created && (report_already_existing_ || !first_scan_))
\r
169 handler_(CREATED, path);
\r
172 initial_files.insert(path);
\r
174 files_.insert(std::make_pair(path, current_mtime));
\r
175 being_written_sizes_.erase(path);
\r
178 removed_files.erase(path);
\r
181 BOOST_FOREACH(auto& path, removed_files)
\r
183 files_.erase(path);
\r
184 being_written_sizes_.erase(path);
\r
186 if (interested_in_removed)
\r
187 handler_(REMOVED, path);
\r
191 initial_files_handler_(initial_files);
\r
193 first_scan_ = false;
\r
196 bool can_read_file(const boost::filesystem::wpath& file)
\r
198 boost::filesystem::wifstream stream(file);
\r
200 return stream.is_open();
\r
204 class polling_filesystem_monitor : public filesystem_monitor
\r
206 directory_monitor root_monitor_;
\r
207 executor executor_;
\r
208 boost::asio::io_service& scheduler_;
\r
209 boost::asio::deadline_timer timer_;
\r
210 tbb::atomic<bool> running_;
\r
211 int scan_interval_millis_;
\r
212 boost::promise<void> initial_scan_completion_;
\r
213 tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
\r
214 tbb::atomic<bool> reemmit_all_;
\r
216 polling_filesystem_monitor(
\r
217 const boost::filesystem::wpath& folder_to_watch,
\r
218 filesystem_event events_of_interest_mask,
\r
219 bool report_already_existing,
\r
220 int scan_interval_millis,
\r
221 boost::asio::io_service& scheduler,
\r
222 const filesystem_monitor_handler& handler,
\r
223 const initial_files_handler& initial_files_handler)
\r
225 report_already_existing,
\r
227 events_of_interest_mask,
\r
229 initial_files_handler)
\r
230 , executor_(L"polling_filesystem_monitor")
\r
231 , scheduler_(scheduler)
\r
232 , timer_(scheduler)
\r
233 , scan_interval_millis_(scan_interval_millis)
\r
236 reemmit_all_ = false;
\r
237 executor_.begin_invoke([this]
\r
240 initial_scan_completion_.set_value();
\r
245 virtual ~polling_filesystem_monitor()
\r
248 boost::system::error_code e;
\r
252 virtual boost::unique_future<void> initial_files_processed()
\r
254 return initial_scan_completion_.get_future();
\r
257 virtual void reemmit_all()
\r
259 reemmit_all_ = true;
\r
262 virtual void reemmit(const boost::filesystem::wpath& file)
\r
264 to_reemmit_.push(file);
\r
267 void schedule_next()
\r
272 timer_.expires_from_now(
\r
273 boost::posix_time::milliseconds(scan_interval_millis_));
\r
274 timer_.async_wait([this](const boost::system::error_code& e)
\r
288 if (reemmit_all_.fetch_and_store(false))
\r
289 root_monitor_.reemmit_all();
\r
292 boost::filesystem::wpath file;
\r
294 while (to_reemmit_.try_pop(file))
\r
295 root_monitor_.reemmit(file);
\r
298 root_monitor_.scan([=] { return !running_; });
\r
302 CASPAR_LOG_CURRENT_EXCEPTION();
\r
307 struct polling_filesystem_monitor_factory::implementation
\r
309 boost::asio::io_service& scheduler_;
\r
310 int scan_interval_millis;
\r
313 boost::asio::io_service& scheduler, int scan_interval_millis)
\r
314 : scheduler_(scheduler), scan_interval_millis(scan_interval_millis)
\r
319 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
\r
320 boost::asio::io_service& scheduler,
\r
321 int scan_interval_millis)
\r
322 : impl_(new implementation(scheduler, scan_interval_millis))
\r
326 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
\r
330 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
\r
331 const boost::filesystem::wpath& folder_to_watch,
\r
332 filesystem_event events_of_interest_mask,
\r
333 bool report_already_existing,
\r
334 const filesystem_monitor_handler& handler,
\r
335 const initial_files_handler& initial_files_handler)
\r
337 return make_safe<polling_filesystem_monitor>(
\r
339 events_of_interest_mask,
\r
340 report_already_existing,
\r
341 impl_->scan_interval_millis,
\r
344 initial_files_handler);
\r