]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
c189218ea31ab0d450ef545009477131c33e241f
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/asio.hpp>
32 #include <boost/thread.hpp>
33 #include <boost/filesystem/fstream.hpp>
34 #include <boost/filesystem/convenience.hpp>
35
36 #include <tbb/atomic.h>
37 #include <tbb/concurrent_queue.h>
38
39 #include "executor.h"
40 #include "linq.h"
41
42 namespace caspar {
43
44 class exception_protected_handler
45 {
46         filesystem_monitor_handler handler_;
47 public:
48         exception_protected_handler(const filesystem_monitor_handler& handler)
49                 : handler_(handler)
50         {
51         }
52
53         void operator()(filesystem_event event, const boost::filesystem::path& file)
54         {
55                 try
56                 {
57                         handler_(event, file);
58                 }
59                 catch (...)
60                 {
61                         CASPAR_LOG_CURRENT_EXCEPTION();
62                 }
63         }
64 };
65
66 class directory_monitor
67 {
68         bool                                                                                    report_already_existing_;
69         boost::filesystem::path                                                 folder_;
70         filesystem_event                                                                events_mask_;
71         filesystem_monitor_handler                                              handler_;
72         initial_files_handler                                                   initial_files_handler_;
73         bool                                                                                    first_scan_                                     = true;
74         std::map<boost::filesystem::path, std::time_t>  files_;
75         std::map<boost::filesystem::path, uintmax_t>    being_written_sizes_;
76 public:
77         directory_monitor(
78                         bool report_already_existing,
79                         const boost::filesystem::path& folder,
80                         filesystem_event events_mask,
81                         const filesystem_monitor_handler& handler,
82                         const initial_files_handler& initial_files_handler)
83                 : report_already_existing_(report_already_existing)
84                 , folder_(folder)
85                 , events_mask_(events_mask)
86                 , handler_(exception_protected_handler(handler))
87                 , initial_files_handler_(initial_files_handler)
88         {
89         }
90
91         void reemmit_all(const tbb::atomic<bool>& running)
92         {
93                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
94                         return;
95
96                 for (auto& file : files_)
97                 {
98                         if (!running)
99                                 return;
100
101                         handler_(filesystem_event::MODIFIED, file.first);
102                 }
103         }
104
105         void reemmit(const boost::filesystem::path& file)
106         {
107                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
108                         return;
109
110                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
111                         handler_(filesystem_event::MODIFIED, file);
112         }
113
114         void scan(const boost::function<bool ()>& should_abort)
115         {
116                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
117                 using namespace boost::filesystem;
118
119                 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
120                 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
121                 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
122
123                 auto filenames = cpplinq::from(files_).select(keys());
124                 std::set<path> removed_files(filenames.begin(), filenames.end());
125                 std::set<path> initial_files;
126
127                 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
128                 {
129                         if (should_abort())
130                                 return;
131
132                         auto& path = iter->path();
133
134                         if (is_directory(path))
135                                 continue;
136
137                         auto now = std::time(nullptr);
138                         std::time_t current_mtime;
139
140                         try
141                         {
142                                 current_mtime = last_write_time(path);
143                         }
144                         catch (...)
145                         {
146                                 // Probably removed, will be captured the next round.
147                                 continue;
148                         }
149
150                         auto time_since_written_to = now - current_mtime;
151                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
152                         auto previous_it = files_.find(path);
153                         bool already_known = previous_it != files_.end();
154
155                         if (already_known && no_longer_being_written_to)
156                         {
157                                 bool modified = previous_it->second != current_mtime;
158
159                                 if (modified && can_read_file(path))
160                                 {
161                                         if (interested_in_modified)
162                                                 handler_(filesystem_event::MODIFIED, path);
163
164                                         files_[path] = current_mtime;
165                                         being_written_sizes_.erase(path);
166                                 }
167                         }
168                         else if (no_longer_being_written_to && can_read_file(path))
169                         {
170                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
171                                         handler_(filesystem_event::CREATED, path);
172
173                                 if (first_scan_)
174                                         initial_files.insert(path);
175
176                                 files_.insert(std::make_pair(path, current_mtime));
177                                 being_written_sizes_.erase(path);
178                         }
179
180                         removed_files.erase(path);
181                 }
182
183                 for (auto& path : removed_files)
184                 {
185                         files_.erase(path);
186                         being_written_sizes_.erase(path);
187
188                         if (interested_in_removed)
189                                 handler_(filesystem_event::REMOVED, path);
190                 }
191
192                 if (first_scan_)
193                         initial_files_handler_(initial_files);
194
195                 first_scan_ = false;
196         }
197 private:
198         bool can_read_file(const boost::filesystem::path& file)
199         {
200                 boost::filesystem::wifstream stream(file);
201
202                 return stream.is_open();
203         }
204 };
205
206 class polling_filesystem_monitor : public filesystem_monitor
207 {
208         tbb::atomic<bool> running_;
209         std::shared_ptr<boost::asio::io_service> scheduler_;
210         directory_monitor root_monitor_;
211         boost::asio::deadline_timer timer_;
212         int scan_interval_millis_;
213         std::promise<void> initial_scan_completion_;
214         tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
215         tbb::atomic<bool> reemmit_all_;
216         executor executor_;
217 public:
218         polling_filesystem_monitor(
219                         const boost::filesystem::path& folder_to_watch,
220                         filesystem_event events_of_interest_mask,
221                         bool report_already_existing,
222                         int scan_interval_millis,
223                         std::shared_ptr<boost::asio::io_service> scheduler,
224                         const filesystem_monitor_handler& handler,
225                         const initial_files_handler& initial_files_handler)
226                 : scheduler_(std::move(scheduler))
227                 , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
228                 , timer_(*scheduler_)
229                 , scan_interval_millis_(scan_interval_millis)
230                 , executor_(L"polling_filesystem_monitor")
231         {
232                 running_ = true;
233                 reemmit_all_ = false;
234                 executor_.begin_invoke([this]
235                 {
236                         scan();
237                         initial_scan_completion_.set_value();
238                         schedule_next();
239                 });
240         }
241
242         ~polling_filesystem_monitor()
243         {
244                 running_ = false;
245                 boost::system::error_code e;
246                 timer_.cancel(e);
247         }
248
249         std::future<void> initial_files_processed() override
250         {
251                 return initial_scan_completion_.get_future();
252         }
253
254         void reemmit_all() override
255         {
256                 reemmit_all_ = true;
257         }
258
259         void reemmit(const boost::filesystem::path& file) override
260         {
261                 to_reemmit_.push(file);
262         }
263 private:
264         void schedule_next()
265         {
266                 if (!running_)
267                         return;
268
269                 timer_.expires_from_now(
270                         boost::posix_time::milliseconds(scan_interval_millis_));
271                 timer_.async_wait([this](const boost::system::error_code& e)
272                 {
273                         begin_scan();
274                 });
275         }
276
277         void begin_scan()
278         {
279                 if (!running_)
280                         return;
281
282                 executor_.begin_invoke([this]()
283                 {
284                         scan();
285                         schedule_next();
286                 });
287         }
288
289         void scan()
290         {
291                 if (!running_)
292                         return;
293
294                 try
295                 {
296                         if (reemmit_all_.fetch_and_store(false))
297                                 root_monitor_.reemmit_all(running_);
298                         else
299                         {
300                                 boost::filesystem::path file;
301
302                                 while (to_reemmit_.try_pop(file))
303                                         root_monitor_.reemmit(file);
304                         }
305
306                         root_monitor_.scan([=] { return !running_; });
307                 }
308                 catch (...)
309                 {
310                         CASPAR_LOG_CURRENT_EXCEPTION();
311                 }
312         }
313 };
314
315 struct polling_filesystem_monitor_factory::impl
316 {
317         std::shared_ptr<boost::asio::io_service> scheduler_;
318         int scan_interval_millis;
319
320         impl(
321                         std::shared_ptr<boost::asio::io_service> scheduler,
322                         int scan_interval_millis)
323                 : scheduler_(std::move(scheduler))
324                 , scan_interval_millis(scan_interval_millis)
325         {
326         }
327 };
328
329 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
330                 std::shared_ptr<boost::asio::io_service> scheduler,
331                 int scan_interval_millis)
332         : impl_(new impl(std::move(scheduler), scan_interval_millis))
333 {
334 }
335
336 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
337 {
338 }
339
340 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
341                 const boost::filesystem::path& folder_to_watch,
342                 filesystem_event events_of_interest_mask,
343                 bool report_already_existing,
344                 const filesystem_monitor_handler& handler,
345                 const initial_files_handler& initial_files_handler)
346 {
347         return spl::make_shared<polling_filesystem_monitor>(
348                         folder_to_watch,
349                         events_of_interest_mask,
350                         report_already_existing,
351                         impl_->scan_interval_millis,
352                         impl_->scheduler_,
353                         handler,
354                         initial_files_handler);
355 }
356
357 }