]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
[polling_filesystem_monitor] Fixed bug where unallocated memory could be read by...
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/asio.hpp>
32 #include <boost/thread.hpp>
33 #include <boost/filesystem/fstream.hpp>
34 #include <boost/filesystem/convenience.hpp>
35
36 #include <tbb/atomic.h>
37 #include <tbb/concurrent_queue.h>
38
39 #include "executor.h"
40 #include "linq.h"
41
42 namespace caspar {
43
44 class exception_protected_handler
45 {
46         filesystem_monitor_handler handler_;
47 public:
48         exception_protected_handler(const filesystem_monitor_handler& handler)
49                 : handler_(handler)
50         {
51         }
52
53         void operator()(filesystem_event event, const boost::filesystem::path& file)
54         {
55                 try
56                 {
57                         handler_(event, file);
58                 }
59                 catch (...)
60                 {
61                         CASPAR_LOG_CURRENT_EXCEPTION();
62                 }
63         }
64 };
65
66 class directory_monitor
67 {
68         bool                                                                                    report_already_existing_;
69         boost::filesystem::path                                                 folder_;
70         filesystem_event                                                                events_mask_;
71         filesystem_monitor_handler                                              handler_;
72         initial_files_handler                                                   initial_files_handler_;
73         bool                                                                                    first_scan_                                     = true;
74         std::map<boost::filesystem::path, std::time_t>  files_;
75         std::map<boost::filesystem::path, uintmax_t>    being_written_sizes_;
76 public:
77         directory_monitor(
78                         bool report_already_existing,
79                         const boost::filesystem::path& folder,
80                         filesystem_event events_mask,
81                         const filesystem_monitor_handler& handler,
82                         const initial_files_handler& initial_files_handler)
83                 : report_already_existing_(report_already_existing)
84                 , folder_(folder)
85                 , events_mask_(events_mask)
86                 , handler_(exception_protected_handler(handler))
87                 , initial_files_handler_(initial_files_handler)
88         {
89         }
90
91         void reemmit_all(const tbb::atomic<bool>& running)
92         {
93                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
94                         return;
95
96                 for (auto& file : files_)
97                 {
98                         if (!running)
99                                 return;
100
101                         handler_(filesystem_event::MODIFIED, file.first);
102                 }
103         }
104
105         void reemmit(const boost::filesystem::path& file)
106         {
107                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
108                         return;
109
110                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
111                         handler_(filesystem_event::MODIFIED, file);
112         }
113
114         void scan(const boost::function<bool ()>& should_abort)
115         {
116                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
117                 using namespace boost::filesystem;
118
119                 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
120                 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
121                 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
122
123                 auto filenames = cpplinq::from(files_).select(keys());
124                 std::set<path> removed_files(filenames.begin(), filenames.end());
125                 std::set<path> initial_files;
126
127                 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
128                 {
129                         if (should_abort())
130                                 return;
131
132                         auto& path = iter->path();
133
134                         if (is_directory(path))
135                                 continue;
136
137                         auto now = std::time(nullptr);
138                         std::time_t current_mtime;
139
140                         try
141                         {
142                                 current_mtime = last_write_time(path);
143                         }
144                         catch (...)
145                         {
146                                 // Probably removed, will be captured the next round.
147                                 continue;
148                         }
149
150                         auto time_since_written_to = now - current_mtime;
151                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
152                         auto previous_it = files_.find(path);
153                         bool already_known = previous_it != files_.end();
154
155                         if (already_known && no_longer_being_written_to)
156                         {
157                                 bool modified = previous_it->second != current_mtime;
158
159                                 if (modified && can_read_file(path))
160                                 {
161                                         if (interested_in_modified)
162                                                 handler_(filesystem_event::MODIFIED, path);
163
164                                         files_[path] = current_mtime;
165                                         being_written_sizes_.erase(path);
166                                 }
167                         }
168                         else if (no_longer_being_written_to && can_read_file(path))
169                         {
170                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
171                                         handler_(filesystem_event::CREATED, path);
172
173                                 if (first_scan_)
174                                         initial_files.insert(path);
175
176                                 files_.insert(std::make_pair(path, current_mtime));
177                                 being_written_sizes_.erase(path);
178                         }
179
180                         removed_files.erase(path);
181                 }
182
183                 for (auto& path : removed_files)
184                 {
185                         files_.erase(path);
186                         being_written_sizes_.erase(path);
187
188                         if (interested_in_removed)
189                                 handler_(filesystem_event::REMOVED, path);
190                 }
191
192                 if (first_scan_)
193                         initial_files_handler_(initial_files);
194
195                 first_scan_ = false;
196         }
197 private:
198         bool can_read_file(const boost::filesystem::path& file)
199         {
200                 boost::filesystem::wifstream stream(file);
201
202                 return stream.is_open();
203         }
204 };
205
206 class polling_filesystem_monitor
207                 : public filesystem_monitor
208                 , public spl::enable_shared_from_this<polling_filesystem_monitor>
209 {
210         tbb::atomic<bool>                                                               running_;
211         std::shared_ptr<boost::asio::io_service>                scheduler_;
212         directory_monitor                                                               root_monitor_;
213         boost::asio::deadline_timer                                             timer_;
214         int                                                                                             scan_interval_millis_;
215         std::promise<void>                                                              initial_scan_completion_;
216         tbb::concurrent_queue<boost::filesystem::path>  to_reemmit_;
217         tbb::atomic<bool>                                                               reemmit_all_;
218         executor                                                                                executor_;
219 public:
220         polling_filesystem_monitor(
221                         const boost::filesystem::path& folder_to_watch,
222                         filesystem_event events_of_interest_mask,
223                         bool report_already_existing,
224                         int scan_interval_millis,
225                         std::shared_ptr<boost::asio::io_service> scheduler,
226                         const filesystem_monitor_handler& handler,
227                         const initial_files_handler& initial_files_handler)
228                 : scheduler_(std::move(scheduler))
229                 , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
230                 , timer_(*scheduler_)
231                 , scan_interval_millis_(scan_interval_millis)
232                 , executor_(L"polling_filesystem_monitor")
233         {
234                 running_ = true;
235                 reemmit_all_ = false;
236         }
237
238         void start()
239         {
240                 executor_.begin_invoke([this]
241                 {
242                         scan();
243                         initial_scan_completion_.set_value();
244                         schedule_next();
245                 });
246         }
247
248         ~polling_filesystem_monitor()
249         {
250                 running_ = false;
251                 boost::system::error_code e;
252                 timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
253         }
254
255         std::future<void> initial_files_processed() override
256         {
257                 return initial_scan_completion_.get_future();
258         }
259
260         void reemmit_all() override
261         {
262                 reemmit_all_ = true;
263         }
264
265         void reemmit(const boost::filesystem::path& file) override
266         {
267                 to_reemmit_.push(file);
268         }
269 private:
270         void schedule_next()
271         {
272                 if (!running_)
273                         return;
274
275                 std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
276
277                 timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
278                 timer_.async_wait([weak_self](const boost::system::error_code& e)
279                 {
280                         auto strong_self = weak_self.lock();
281
282                         if (strong_self)
283                                 strong_self->begin_scan();
284                 });
285         }
286
287         void begin_scan()
288         {
289                 if (!running_)
290                         return;
291
292                 executor_.begin_invoke([this]()
293                 {
294                         scan();
295                         schedule_next();
296                 });
297         }
298
299         void scan()
300         {
301                 if (!running_)
302                         return;
303
304                 try
305                 {
306                         if (reemmit_all_.fetch_and_store(false))
307                                 root_monitor_.reemmit_all(running_);
308                         else
309                         {
310                                 boost::filesystem::path file;
311
312                                 while (to_reemmit_.try_pop(file))
313                                         root_monitor_.reemmit(file);
314                         }
315
316                         root_monitor_.scan([=] { return !running_; });
317                 }
318                 catch (...)
319                 {
320                         CASPAR_LOG_CURRENT_EXCEPTION();
321                 }
322         }
323 };
324
325 struct polling_filesystem_monitor_factory::impl
326 {
327         std::shared_ptr<boost::asio::io_service> scheduler_;
328         int scan_interval_millis;
329
330         impl(
331                         std::shared_ptr<boost::asio::io_service> scheduler,
332                         int scan_interval_millis)
333                 : scheduler_(std::move(scheduler))
334                 , scan_interval_millis(scan_interval_millis)
335         {
336         }
337 };
338
339 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
340                 std::shared_ptr<boost::asio::io_service> scheduler,
341                 int scan_interval_millis)
342         : impl_(new impl(std::move(scheduler), scan_interval_millis))
343 {
344 }
345
346 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
347 {
348 }
349
350 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
351                 const boost::filesystem::path& folder_to_watch,
352                 filesystem_event events_of_interest_mask,
353                 bool report_already_existing,
354                 const filesystem_monitor_handler& handler,
355                 const initial_files_handler& initial_files_handler)
356 {
357         auto monitor = spl::make_shared<polling_filesystem_monitor>(
358                         folder_to_watch,
359                         events_of_interest_mask,
360                         report_already_existing,
361                         impl_->scan_interval_millis,
362                         impl_->scheduler_,
363                         handler,
364                         initial_files_handler);
365
366         monitor->start();
367
368         return monitor;
369 }
370
371 }