]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
Merge branch '2.1.0' of https://github.com/CasparCG/Server into 2.1.0
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/thread.hpp>
32 #include <boost/foreach.hpp>
33 #include <boost/range/adaptor/map.hpp>
34 #include <boost/range/algorithm/copy.hpp>
35 #include <boost/filesystem/fstream.hpp>
36 #include <boost/filesystem/convenience.hpp>
37
38 #include <tbb/atomic.h>
39 #include <tbb/concurrent_queue.h>
40
41 #include "executor.h"
42
43 namespace caspar {
44
45 class exception_protected_handler
46 {
47         filesystem_monitor_handler handler_;
48 public:
49         exception_protected_handler(const filesystem_monitor_handler& handler)
50                 : handler_(handler)
51         {
52         }
53
54         void operator()(filesystem_event event, const boost::filesystem::wpath& file)
55         {
56                 try
57                 {
58                         boost::this_thread::interruption_point();
59                         handler_(event, file);
60                         boost::this_thread::interruption_point();
61                 }
62                 catch (const boost::thread_interrupted&)
63                 {
64                         throw;
65                 }
66                 catch (...)
67                 {
68                         CASPAR_LOG_CURRENT_EXCEPTION();
69                 }
70         }
71 };
72
73 class directory_monitor
74 {
75         bool report_already_existing_;
76         boost::filesystem::wpath folder_;
77         filesystem_event events_mask_;
78         filesystem_monitor_handler handler_;
79         initial_files_handler initial_files_handler_;
80         bool first_scan_;
81         std::map<boost::filesystem::wpath, std::time_t> files_;
82         std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;
83 public:
84         directory_monitor(
85                         bool report_already_existing,
86                         const boost::filesystem::wpath& folder,
87                         filesystem_event events_mask,
88                         const filesystem_monitor_handler& handler,
89                         const initial_files_handler& initial_files_handler)
90                 : report_already_existing_(report_already_existing)
91                 , folder_(folder)
92                 , events_mask_(events_mask)
93                 , handler_(exception_protected_handler(handler))
94                 , initial_files_handler_(initial_files_handler)
95                 , first_scan_(true)
96         {
97         }
98
99         void reemmit_all()
100         {
101                 if ((events_mask_ & MODIFIED) == 0)
102                         return;
103
104                 BOOST_FOREACH(auto& file, files_)
105                         handler_(MODIFIED, file.first);
106         }
107
108         void reemmit(const boost::filesystem::wpath& file)
109         {
110                 if ((events_mask_ & MODIFIED) == 0)
111                         return;
112
113                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
114                         handler_(MODIFIED, file);
115         }
116
117         void scan(const boost::function<bool ()>& should_abort)
118         {
119                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
120                 using namespace boost::filesystem;
121
122                 bool interested_in_removed = (events_mask_ & REMOVED) > 0;
123                 bool interested_in_created = (events_mask_ & CREATED) > 0;
124                 bool interested_in_modified = (events_mask_ & MODIFIED) > 0;
125
126                 std::set<wpath> removed_files;
127                 boost::copy(
128                                 files_ | boost::adaptors::map_keys,
129                                 std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));
130
131                 std::set<wpath> initial_files;
132
133                 for (boost::filesystem3::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem3::wrecursive_directory_iterator(); ++iter)
134                 {
135                         if (should_abort())
136                                 return;
137
138                         auto& path = iter->path();
139
140                         if (is_directory(path))
141                                 continue;
142
143                         auto now = std::time(nullptr);
144                         std::time_t current_mtime;
145
146                         try
147                         {
148                                 current_mtime = last_write_time(path);
149                         }
150                         catch (...)
151                         {
152                                 // Probably removed, will be captured the next round.
153                                 continue;
154                         }
155
156                         auto time_since_written_to = now - current_mtime;
157                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
158                         auto previous_it = files_.find(path);
159                         bool already_known = previous_it != files_.end();
160
161                         if (already_known && no_longer_being_written_to)
162                         {
163                                 bool modified = previous_it->second != current_mtime;
164
165                                 if (modified && can_read_file(path))
166                                 {
167                                         if (interested_in_modified)
168                                                 handler_(MODIFIED, path);
169
170                                         files_[path] = current_mtime;
171                                         being_written_sizes_.erase(path);
172                                 }
173                         }
174                         else if (no_longer_being_written_to && can_read_file(path))
175                         {
176                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
177                                         handler_(CREATED, path);
178
179                                 if (first_scan_)
180                                         initial_files.insert(path);
181
182                                 files_.insert(std::make_pair(path, current_mtime));
183                                 being_written_sizes_.erase(path);
184                         }
185
186                         removed_files.erase(path);
187                 }
188
189                 BOOST_FOREACH(auto& path, removed_files)
190                 {
191                         files_.erase(path);
192                         being_written_sizes_.erase(path);
193
194                         if (interested_in_removed)
195                                 handler_(REMOVED, path);
196                 }
197
198                 if (first_scan_)
199                         initial_files_handler_(initial_files);
200
201                 first_scan_ = false;
202         }
203 private:
204         bool can_read_file(const boost::filesystem::wpath& file)
205         {
206                 boost::filesystem::wifstream stream(file);
207
208                 return stream.is_open();
209         }
210 };
211
212 class polling_filesystem_monitor : public filesystem_monitor
213 {
214         directory_monitor root_monitor_;
215         boost::thread scanning_thread_;
216         tbb::atomic<bool> running_;
217         int scan_interval_millis_;
218         boost::promise<void> initial_scan_completion_;
219         tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;
220         tbb::atomic<bool> reemmit_all_;
221 public:
222         polling_filesystem_monitor(
223                         const boost::filesystem::wpath& folder_to_watch,
224                         filesystem_event events_of_interest_mask,
225                         bool report_already_existing,
226                         int scan_interval_millis,
227                         const filesystem_monitor_handler& handler,
228                         const initial_files_handler& initial_files_handler)
229                 : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
230                 , scan_interval_millis_(scan_interval_millis)
231         {
232                 running_ = true;
233                 reemmit_all_ = false;
234                 scanning_thread_ = boost::thread([this] { scanner(); });
235         }
236
237         virtual ~polling_filesystem_monitor()
238         {
239                 running_ = false;
240                 scanning_thread_.interrupt();
241                 scanning_thread_.join();
242         }
243
244         virtual boost::unique_future<void> initial_files_processed()
245         {
246                 return initial_scan_completion_.get_future();
247         }
248
249         virtual void reemmit_all()
250         {
251                 reemmit_all_ = true;
252         }
253
254         virtual void reemmit(const boost::filesystem::wpath& file)
255         {
256                 to_reemmit_.push(file);
257         }
258 private:
259         void scanner()
260         {
261                 win32_exception::install_handler();
262
263                 //detail::SetThreadName(GetCurrentThreadId(), "polling_filesystem_monitor");
264
265                 bool running = scan(false);
266                 initial_scan_completion_.set_value();
267
268                 if (running)
269                         while (scan(true));
270         }
271
272         bool scan(bool sleep)
273         {
274                 try
275                 {
276                         if (sleep)
277                                 boost::this_thread::sleep(boost::posix_time::milliseconds(scan_interval_millis_));
278
279                         if (reemmit_all_.fetch_and_store(false))
280                                 root_monitor_.reemmit_all();
281                         else
282                         {
283                                 boost::filesystem::wpath file;
284
285                                 while (to_reemmit_.try_pop(file))
286                                         root_monitor_.reemmit(file);
287                         }
288
289                         root_monitor_.scan([=] { return !running_; });
290                 }
291                 catch (const boost::thread_interrupted&)
292                 {
293                 }
294                 catch (...)
295                 {
296                         CASPAR_LOG_CURRENT_EXCEPTION();
297                 }
298
299                 return running_;
300         }
301 };
302
303 struct polling_filesystem_monitor_factory::impl
304 {
305         int scan_interval_millis;
306
307         impl(int scan_interval_millis)
308                 : scan_interval_millis(scan_interval_millis)
309         {
310         }
311 };
312
313 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
314         : impl_(new impl(scan_interval_millis))
315 {
316 }
317
318 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
319 {
320 }
321
322 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
323                 const boost::filesystem::wpath& folder_to_watch,
324                 filesystem_event events_of_interest_mask,
325                 bool report_already_existing,
326                 const filesystem_monitor_handler& handler,
327                 const initial_files_handler& initial_files_handler)
328 {
329         return spl::make_shared<polling_filesystem_monitor>(
330                         folder_to_watch,
331                         events_of_interest_mask,
332                         report_already_existing,
333                         impl_->scan_interval_millis,
334                         handler,
335                         initial_files_handler);
336 }
337
338 }