]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
Refactored to use non-static data member initializers where it makes sense. Mixes...
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/thread.hpp>
32 #include <boost/range/adaptor/map.hpp>
33 #include <boost/range/algorithm/copy.hpp>
34 #include <boost/filesystem/fstream.hpp>
35 #include <boost/filesystem/convenience.hpp>
36
37 #include <tbb/atomic.h>
38 #include <tbb/concurrent_queue.h>
39
40 #include "executor.h"
41
42 namespace caspar {
43
44 class exception_protected_handler
45 {
46         filesystem_monitor_handler handler_;
47 public:
48         exception_protected_handler(const filesystem_monitor_handler& handler)
49                 : handler_(handler)
50         {
51         }
52
53         void operator()(filesystem_event event, const boost::filesystem::wpath& file)
54         {
55                 try
56                 {
57                         boost::this_thread::interruption_point();
58                         handler_(event, file);
59                         boost::this_thread::interruption_point();
60                 }
61                 catch (const boost::thread_interrupted&)
62                 {
63                         throw;
64                 }
65                 catch (...)
66                 {
67                         CASPAR_LOG_CURRENT_EXCEPTION();
68                 }
69         }
70 };
71
72 class directory_monitor
73 {
74         bool                                                                                    report_already_existing_;
75         boost::filesystem::wpath                                                folder_;
76         filesystem_event                                                                events_mask_;
77         filesystem_monitor_handler                                              handler_;
78         initial_files_handler                                                   initial_files_handler_;
79         bool                                                                                    first_scan_                                     = true;
80         std::map<boost::filesystem::wpath, std::time_t> files_;
81         std::map<boost::filesystem::wpath, uintmax_t>   being_written_sizes_;
82 public:
83         directory_monitor(
84                         bool report_already_existing,
85                         const boost::filesystem::wpath& folder,
86                         filesystem_event events_mask,
87                         const filesystem_monitor_handler& handler,
88                         const initial_files_handler& initial_files_handler)
89                 : report_already_existing_(report_already_existing)
90                 , folder_(folder)
91                 , events_mask_(events_mask)
92                 , handler_(exception_protected_handler(handler))
93                 , initial_files_handler_(initial_files_handler)
94         {
95         }
96
97         void reemmit_all()
98         {
99                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
100                         return;
101
102                 for (auto& file : files_)
103                         handler_(filesystem_event::MODIFIED, file.first);
104         }
105
106         void reemmit(const boost::filesystem::wpath& file)
107         {
108                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
109                         return;
110
111                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
112                         handler_(filesystem_event::MODIFIED, file);
113         }
114
115         void scan(const boost::function<bool ()>& should_abort)
116         {
117                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
118                 using namespace boost::filesystem;
119
120                 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
121                 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
122                 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
123
124                 std::set<wpath> removed_files;
125                 boost::copy(
126                                 files_ | boost::adaptors::map_keys,
127                                 std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));
128
129                 std::set<wpath> initial_files;
130
131                 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
132                 {
133                         if (should_abort())
134                                 return;
135
136                         auto& path = iter->path();
137
138                         if (is_directory(path))
139                                 continue;
140
141                         auto now = std::time(nullptr);
142                         std::time_t current_mtime;
143
144                         try
145                         {
146                                 current_mtime = last_write_time(path);
147                         }
148                         catch (...)
149                         {
150                                 // Probably removed, will be captured the next round.
151                                 continue;
152                         }
153
154                         auto time_since_written_to = now - current_mtime;
155                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
156                         auto previous_it = files_.find(path);
157                         bool already_known = previous_it != files_.end();
158
159                         if (already_known && no_longer_being_written_to)
160                         {
161                                 bool modified = previous_it->second != current_mtime;
162
163                                 if (modified && can_read_file(path))
164                                 {
165                                         if (interested_in_modified)
166                                                 handler_(filesystem_event::MODIFIED, path);
167
168                                         files_[path] = current_mtime;
169                                         being_written_sizes_.erase(path);
170                                 }
171                         }
172                         else if (no_longer_being_written_to && can_read_file(path))
173                         {
174                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
175                                         handler_(filesystem_event::CREATED, path);
176
177                                 if (first_scan_)
178                                         initial_files.insert(path);
179
180                                 files_.insert(std::make_pair(path, current_mtime));
181                                 being_written_sizes_.erase(path);
182                         }
183
184                         removed_files.erase(path);
185                 }
186
187                 for (auto& path : removed_files)
188                 {
189                         files_.erase(path);
190                         being_written_sizes_.erase(path);
191
192                         if (interested_in_removed)
193                                 handler_(filesystem_event::REMOVED, path);
194                 }
195
196                 if (first_scan_)
197                         initial_files_handler_(initial_files);
198
199                 first_scan_ = false;
200         }
201 private:
202         bool can_read_file(const boost::filesystem::wpath& file)
203         {
204                 boost::filesystem::wifstream stream(file);
205
206                 return stream.is_open();
207         }
208 };
209
210 class polling_filesystem_monitor : public filesystem_monitor
211 {
212         directory_monitor root_monitor_;
213         boost::thread scanning_thread_;
214         tbb::atomic<bool> running_;
215         int scan_interval_millis_;
216         std::promise<void> initial_scan_completion_;
217         tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
218         tbb::atomic<bool> reemmit_all_;
219 public:
220         polling_filesystem_monitor(
221                         const boost::filesystem::wpath& folder_to_watch,
222                         filesystem_event events_of_interest_mask,
223                         bool report_already_existing,
224                         int scan_interval_millis,
225                         const filesystem_monitor_handler& handler,
226                         const initial_files_handler& initial_files_handler)
227                 : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
228                 , scan_interval_millis_(scan_interval_millis)
229         {
230                 running_ = true;
231                 reemmit_all_ = false;
232                 scanning_thread_ = boost::thread([this] { scanner(); });
233         }
234
235         virtual ~polling_filesystem_monitor()
236         {
237                 running_ = false;
238                 scanning_thread_.interrupt();
239                 scanning_thread_.join();
240         }
241
242         virtual std::future<void> initial_files_processed()
243         {
244                 return initial_scan_completion_.get_future();
245         }
246
247         virtual void reemmit_all()
248         {
249                 reemmit_all_ = true;
250         }
251
252         virtual void reemmit(const boost::filesystem::wpath& file)
253         {
254                 to_reemmit_.push(file);
255         }
256 private:
257         void scanner()
258         {
259                 win32_exception::install_handler();
260
261                 //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
262
263                 bool running = scan(false);
264                 initial_scan_completion_.set_value();
265
266                 if (running)
267                         while (scan(true));
268         }
269
270         bool scan(bool sleep)
271         {
272                 try
273                 {
274                         if (sleep)
275                                 boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
276
277                         if (reemmit_all_.fetch_and_store(false))
278                                 root_monitor_.reemmit_all();
279                         else
280                         {
281                                 boost::filesystem::wpath file;
282
283                                 while (to_reemmit_.try_pop(file))
284                                         root_monitor_.reemmit(file);
285                         }
286
287                         root_monitor_.scan([=] { return !running_; });
288                 }
289                 catch (const boost::thread_interrupted&)
290                 {
291                 }
292                 catch (...)
293                 {
294                         CASPAR_LOG_CURRENT_EXCEPTION();
295                 }
296
297                 return running_;
298         }
299 };
300
301 struct polling_filesystem_monitor_factory::impl
302 {
303         int scan_interval_millis;
304
305         impl(int scan_interval_millis)
306                 : scan_interval_millis(scan_interval_millis)
307         {
308         }
309 };
310
311 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
312         : impl_(new impl(scan_interval_millis))
313 {
314 }
315
316 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
317 {
318 }
319
320 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
321                 const boost::filesystem::wpath& folder_to_watch,
322                 filesystem_event events_of_interest_mask,
323                 bool report_already_existing,
324                 const filesystem_monitor_handler& handler,
325                 const initial_files_handler& initial_files_handler)
326 {
327         return spl::make_shared<polling_filesystem_monitor>(
328                         folder_to_watch,
329                         events_of_interest_mask,
330                         report_already_existing,
331                         impl_->scan_interval_millis,
332                         handler,
333                         initial_files_handler);
334 }
335
336 }