]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/thread.hpp>
32 #include <boost/filesystem/fstream.hpp>
33 #include <boost/filesystem/convenience.hpp>
34
35 #include <tbb/atomic.h>
36 #include <tbb/concurrent_queue.h>
37
38 #include "executor.h"
39 #include "linq.h"
40
41 namespace caspar {
42
43 class exception_protected_handler
44 {
45         filesystem_monitor_handler handler_;
46 public:
47         exception_protected_handler(const filesystem_monitor_handler& handler)
48                 : handler_(handler)
49         {
50         }
51
52         void operator()(filesystem_event event, const boost::filesystem::wpath& file)
53         {
54                 try
55                 {
56                         boost::this_thread::interruption_point();
57                         handler_(event, file);
58                         boost::this_thread::interruption_point();
59                 }
60                 catch (const boost::thread_interrupted&)
61                 {
62                         throw;
63                 }
64                 catch (...)
65                 {
66                         CASPAR_LOG_CURRENT_EXCEPTION();
67                 }
68         }
69 };
70
71 class directory_monitor
72 {
73         bool                                                                                    report_already_existing_;
74         boost::filesystem::wpath                                                folder_;
75         filesystem_event                                                                events_mask_;
76         filesystem_monitor_handler                                              handler_;
77         initial_files_handler                                                   initial_files_handler_;
78         bool                                                                                    first_scan_                                     = true;
79         std::map<boost::filesystem::wpath, std::time_t> files_;
80         std::map<boost::filesystem::wpath, uintmax_t>   being_written_sizes_;
81 public:
82         directory_monitor(
83                         bool report_already_existing,
84                         const boost::filesystem::wpath& folder,
85                         filesystem_event events_mask,
86                         const filesystem_monitor_handler& handler,
87                         const initial_files_handler& initial_files_handler)
88                 : report_already_existing_(report_already_existing)
89                 , folder_(folder)
90                 , events_mask_(events_mask)
91                 , handler_(exception_protected_handler(handler))
92                 , initial_files_handler_(initial_files_handler)
93         {
94         }
95
96         void reemmit_all()
97         {
98                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
99                         return;
100
101                 for (auto& file : files_)
102                         handler_(filesystem_event::MODIFIED, file.first);
103         }
104
105         void reemmit(const boost::filesystem::wpath& file)
106         {
107                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
108                         return;
109
110                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
111                         handler_(filesystem_event::MODIFIED, file);
112         }
113
114         void scan(const boost::function<bool ()>& should_abort)
115         {
116                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
117                 using namespace boost::filesystem;
118
119                 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
120                 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
121                 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
122
123                 auto filenames = cpplinq::from(files_).select(keys());
124                 std::set<wpath> removed_files(filenames.begin(), filenames.end());
125                 std::set<wpath> initial_files;
126
127                 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
128                 {
129                         if (should_abort())
130                                 return;
131
132                         auto& path = iter->path();
133
134                         if (is_directory(path))
135                                 continue;
136
137                         auto now = std::time(nullptr);
138                         std::time_t current_mtime;
139
140                         try
141                         {
142                                 current_mtime = last_write_time(path);
143                         }
144                         catch (...)
145                         {
146                                 // Probably removed, will be captured the next round.
147                                 continue;
148                         }
149
150                         auto time_since_written_to = now - current_mtime;
151                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
152                         auto previous_it = files_.find(path);
153                         bool already_known = previous_it != files_.end();
154
155                         if (already_known && no_longer_being_written_to)
156                         {
157                                 bool modified = previous_it->second != current_mtime;
158
159                                 if (modified && can_read_file(path))
160                                 {
161                                         if (interested_in_modified)
162                                                 handler_(filesystem_event::MODIFIED, path);
163
164                                         files_[path] = current_mtime;
165                                         being_written_sizes_.erase(path);
166                                 }
167                         }
168                         else if (no_longer_being_written_to && can_read_file(path))
169                         {
170                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
171                                         handler_(filesystem_event::CREATED, path);
172
173                                 if (first_scan_)
174                                         initial_files.insert(path);
175
176                                 files_.insert(std::make_pair(path, current_mtime));
177                                 being_written_sizes_.erase(path);
178                         }
179
180                         removed_files.erase(path);
181                 }
182
183                 for (auto& path : removed_files)
184                 {
185                         files_.erase(path);
186                         being_written_sizes_.erase(path);
187
188                         if (interested_in_removed)
189                                 handler_(filesystem_event::REMOVED, path);
190                 }
191
192                 if (first_scan_)
193                         initial_files_handler_(initial_files);
194
195                 first_scan_ = false;
196         }
197 private:
198         bool can_read_file(const boost::filesystem::wpath& file)
199         {
200                 boost::filesystem::wifstream stream(file);
201
202                 return stream.is_open();
203         }
204 };
205
206 class polling_filesystem_monitor : public filesystem_monitor
207 {
208         directory_monitor root_monitor_;
209         boost::thread scanning_thread_;
210         tbb::atomic<bool> running_;
211         int scan_interval_millis_;
212         std::promise<void> initial_scan_completion_;
213         tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
214         tbb::atomic<bool> reemmit_all_;
215 public:
216         polling_filesystem_monitor(
217                         const boost::filesystem::wpath& folder_to_watch,
218                         filesystem_event events_of_interest_mask,
219                         bool report_already_existing,
220                         int scan_interval_millis,
221                         const filesystem_monitor_handler& handler,
222                         const initial_files_handler& initial_files_handler)
223                 : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
224                 , scan_interval_millis_(scan_interval_millis)
225         {
226                 running_ = true;
227                 reemmit_all_ = false;
228                 scanning_thread_ = boost::thread([this] { scanner(); });
229         }
230
231         virtual ~polling_filesystem_monitor()
232         {
233                 running_ = false;
234                 scanning_thread_.interrupt();
235                 scanning_thread_.join();
236         }
237
238         virtual std::future<void> initial_files_processed()
239         {
240                 return initial_scan_completion_.get_future();
241         }
242
243         virtual void reemmit_all()
244         {
245                 reemmit_all_ = true;
246         }
247
248         virtual void reemmit(const boost::filesystem::wpath& file)
249         {
250                 to_reemmit_.push(file);
251         }
252 private:
253         void scanner()
254         {
255                 win32_exception::install_handler();
256
257                 //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
258
259                 bool running = scan(false);
260                 initial_scan_completion_.set_value();
261
262                 if (running)
263                         while (scan(true));
264         }
265
266         bool scan(bool sleep)
267         {
268                 try
269                 {
270                         if (sleep)
271                                 boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
272
273                         if (reemmit_all_.fetch_and_store(false))
274                                 root_monitor_.reemmit_all();
275                         else
276                         {
277                                 boost::filesystem::wpath file;
278
279                                 while (to_reemmit_.try_pop(file))
280                                         root_monitor_.reemmit(file);
281                         }
282
283                         root_monitor_.scan([=] { return !running_; });
284                 }
285                 catch (const boost::thread_interrupted&)
286                 {
287                 }
288                 catch (...)
289                 {
290                         CASPAR_LOG_CURRENT_EXCEPTION();
291                 }
292
293                 return running_;
294         }
295 };
296
297 struct polling_filesystem_monitor_factory::impl
298 {
299         int scan_interval_millis;
300
301         impl(int scan_interval_millis)
302                 : scan_interval_millis(scan_interval_millis)
303         {
304         }
305 };
306
307 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
308         : impl_(new impl(scan_interval_millis))
309 {
310 }
311
312 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
313 {
314 }
315
316 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
317                 const boost::filesystem::wpath& folder_to_watch,
318                 filesystem_event events_of_interest_mask,
319                 bool report_already_existing,
320                 const filesystem_monitor_handler& handler,
321                 const initial_files_handler& initial_files_handler)
322 {
323         return spl::make_shared<polling_filesystem_monitor>(
324                         folder_to_watch,
325                         events_of_interest_mask,
326                         report_already_existing,
327                         impl_->scan_interval_millis,
328                         handler,
329                         initial_files_handler);
330 }
331
332 }