]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
Merged fix for asio::io_service lifetime race condition (sometimes destroyed too...
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/asio.hpp>
32 #include <boost/thread.hpp>
33 #include <boost/filesystem/fstream.hpp>
34 #include <boost/filesystem/convenience.hpp>
35
36 #include <tbb/atomic.h>
37 #include <tbb/concurrent_queue.h>
38
39 #include "executor.h"
40 #include "linq.h"
41
42 namespace caspar {
43
44 class exception_protected_handler
45 {
46         filesystem_monitor_handler handler_;
47 public:
48         exception_protected_handler(const filesystem_monitor_handler& handler)
49                 : handler_(handler)
50         {
51         }
52
53         void operator()(filesystem_event event, const boost::filesystem::path& file)
54         {
55                 try
56                 {
57                         handler_(event, file);
58                 }
59                 catch (...)
60                 {
61                         CASPAR_LOG_CURRENT_EXCEPTION();
62                 }
63         }
64 };
65
66 class directory_monitor
67 {
68         bool                                                                                    report_already_existing_;
69         boost::filesystem::path                                                 folder_;
70         filesystem_event                                                                events_mask_;
71         filesystem_monitor_handler                                              handler_;
72         initial_files_handler                                                   initial_files_handler_;
73         bool                                                                                    first_scan_                                     = true;
74         std::map<boost::filesystem::path, std::time_t>  files_;
75         std::map<boost::filesystem::path, uintmax_t>    being_written_sizes_;
76 public:
77         directory_monitor(
78                         bool report_already_existing,
79                         const boost::filesystem::path& folder,
80                         filesystem_event events_mask,
81                         const filesystem_monitor_handler& handler,
82                         const initial_files_handler& initial_files_handler)
83                 : report_already_existing_(report_already_existing)
84                 , folder_(folder)
85                 , events_mask_(events_mask)
86                 , handler_(exception_protected_handler(handler))
87                 , initial_files_handler_(initial_files_handler)
88         {
89         }
90
91         void reemmit_all()
92         {
93                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
94                         return;
95
96                 for (auto& file : files_)
97                         handler_(filesystem_event::MODIFIED, file.first);
98         }
99
100         void reemmit(const boost::filesystem::path& file)
101         {
102                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
103                         return;
104
105                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
106                         handler_(filesystem_event::MODIFIED, file);
107         }
108
109         void scan(const boost::function<bool ()>& should_abort)
110         {
111                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
112                 using namespace boost::filesystem;
113
114                 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
115                 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
116                 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
117
118                 auto filenames = cpplinq::from(files_).select(keys());
119                 std::set<path> removed_files(filenames.begin(), filenames.end());
120                 std::set<path> initial_files;
121
122                 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
123                 {
124                         if (should_abort())
125                                 return;
126
127                         auto& path = iter->path();
128
129                         if (is_directory(path))
130                                 continue;
131
132                         auto now = std::time(nullptr);
133                         std::time_t current_mtime;
134
135                         try
136                         {
137                                 current_mtime = last_write_time(path);
138                         }
139                         catch (...)
140                         {
141                                 // Probably removed, will be captured the next round.
142                                 continue;
143                         }
144
145                         auto time_since_written_to = now - current_mtime;
146                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
147                         auto previous_it = files_.find(path);
148                         bool already_known = previous_it != files_.end();
149
150                         if (already_known && no_longer_being_written_to)
151                         {
152                                 bool modified = previous_it->second != current_mtime;
153
154                                 if (modified && can_read_file(path))
155                                 {
156                                         if (interested_in_modified)
157                                                 handler_(filesystem_event::MODIFIED, path);
158
159                                         files_[path] = current_mtime;
160                                         being_written_sizes_.erase(path);
161                                 }
162                         }
163                         else if (no_longer_being_written_to && can_read_file(path))
164                         {
165                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
166                                         handler_(filesystem_event::CREATED, path);
167
168                                 if (first_scan_)
169                                         initial_files.insert(path);
170
171                                 files_.insert(std::make_pair(path, current_mtime));
172                                 being_written_sizes_.erase(path);
173                         }
174
175                         removed_files.erase(path);
176                 }
177
178                 for (auto& path : removed_files)
179                 {
180                         files_.erase(path);
181                         being_written_sizes_.erase(path);
182
183                         if (interested_in_removed)
184                                 handler_(filesystem_event::REMOVED, path);
185                 }
186
187                 if (first_scan_)
188                         initial_files_handler_(initial_files);
189
190                 first_scan_ = false;
191         }
192 private:
193         bool can_read_file(const boost::filesystem::path& file)
194         {
195                 boost::filesystem::wifstream stream(file);
196
197                 return stream.is_open();
198         }
199 };
200
201 class polling_filesystem_monitor : public filesystem_monitor
202 {
203         std::shared_ptr<boost::asio::io_service> scheduler_;
204         directory_monitor root_monitor_;
205         executor executor_;
206         boost::asio::deadline_timer timer_;
207         tbb::atomic<bool> running_;
208         int scan_interval_millis_;
209         std::promise<void> initial_scan_completion_;
210         tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
211         tbb::atomic<bool> reemmit_all_;
212 public:
213         polling_filesystem_monitor(
214                         const boost::filesystem::path& folder_to_watch,
215                         filesystem_event events_of_interest_mask,
216                         bool report_already_existing,
217                         int scan_interval_millis,
218                         std::shared_ptr<boost::asio::io_service> scheduler,
219                         const filesystem_monitor_handler& handler,
220                         const initial_files_handler& initial_files_handler)
221                 : scheduler_(std::move(scheduler))
222                 , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
223                 , executor_(L"polling_filesystem_monitor")
224                 , timer_(*scheduler_)
225                 , scan_interval_millis_(scan_interval_millis)
226         {
227                 running_ = true;
228                 reemmit_all_ = false;
229                 executor_.begin_invoke([this]
230                 {
231                         scan();
232                         initial_scan_completion_.set_value();
233                         schedule_next();
234                 });
235         }
236
237         ~polling_filesystem_monitor()
238         {
239                 running_ = false;
240                 boost::system::error_code e;
241                 timer_.cancel(e);
242         }
243
244         std::future<void> initial_files_processed() override
245         {
246                 return initial_scan_completion_.get_future();
247         }
248
249         void reemmit_all() override
250         {
251                 reemmit_all_ = true;
252         }
253
254         void reemmit(const boost::filesystem::path& file) override
255         {
256                 to_reemmit_.push(file);
257         }
258 private:
259         void schedule_next()
260         {
261                 if (!running_)
262                         return;
263
264                 timer_.expires_from_now(
265                         boost::posix_time::milliseconds(scan_interval_millis_));
266                 timer_.async_wait([this](const boost::system::error_code& e)
267                 {
268                         begin_scan();
269                 });
270         }
271
272         void begin_scan()
273         {
274                 if (!running_)
275                         return;
276
277                 executor_.begin_invoke([this]()
278                 {
279                         scan();
280                         schedule_next();
281                 });
282         }
283
284         void scan()
285         {
286                 if (!running_)
287                         return;
288
289                 try
290                 {
291                         if (reemmit_all_.fetch_and_store(false))
292                                 root_monitor_.reemmit_all();
293                         else
294                         {
295                                 boost::filesystem::wpath file;
296
297                                 while (to_reemmit_.try_pop(file))
298                                         root_monitor_.reemmit(file);
299                         }
300
301                         root_monitor_.scan([=] { return !running_; });
302                 }
303                 catch (...)
304                 {
305                         CASPAR_LOG_CURRENT_EXCEPTION();
306                 }
307         }
308 };
309
310 struct polling_filesystem_monitor_factory::impl
311 {
312         std::shared_ptr<boost::asio::io_service> scheduler_;
313         int scan_interval_millis;
314
315         impl(
316                         std::shared_ptr<boost::asio::io_service> scheduler,
317                         int scan_interval_millis)
318                 : scheduler_(std::move(scheduler))
319                 , scan_interval_millis(scan_interval_millis)
320         {
321         }
322 };
323
324 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
325                 std::shared_ptr<boost::asio::io_service> scheduler,
326                 int scan_interval_millis)
327         : impl_(new impl(std::move(scheduler), scan_interval_millis))
328 {
329 }
330
331 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
332 {
333 }
334
335 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
336                 const boost::filesystem::path& folder_to_watch,
337                 filesystem_event events_of_interest_mask,
338                 bool report_already_existing,
339                 const filesystem_monitor_handler& handler,
340                 const initial_files_handler& initial_files_handler)
341 {
342         return spl::make_shared<polling_filesystem_monitor>(
343                         folder_to_watch,
344                         events_of_interest_mask,
345                         report_already_existing,
346                         impl_->scan_interval_millis,
347                         impl_->scheduler_,
348                         handler,
349                         initial_files_handler);
350 }
351
352 }