]> git.sesse.net Git - casparcg/blob - common/polling_filesystem_monitor.cpp
Move from boost::thread to std::thread for nearly everything.
[casparcg] / common / polling_filesystem_monitor.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "stdafx.h"
23
24 #include "polling_filesystem_monitor.h"
25
26 #include <map>
27 #include <set>
28 #include <iostream>
29 #include <cstdint>
30
31 #include <boost/asio.hpp>
32 #include <boost/filesystem/fstream.hpp>
33 #include <boost/filesystem/convenience.hpp>
34
35 #include <tbb/atomic.h>
36 #include <tbb/concurrent_queue.h>
37
38 #include "executor.h"
39 #include "linq.h"
40
41 namespace caspar {
42
43 class exception_protected_handler
44 {
45         filesystem_monitor_handler handler_;
46 public:
47         exception_protected_handler(const filesystem_monitor_handler& handler)
48                 : handler_(handler)
49         {
50         }
51
52         void operator()(filesystem_event event, const boost::filesystem::path& file)
53         {
54                 try
55                 {
56                         handler_(event, file);
57                 }
58                 catch (...)
59                 {
60                         CASPAR_LOG_CURRENT_EXCEPTION();
61                 }
62         }
63 };
64
65 class directory_monitor
66 {
67         bool                                                                                    report_already_existing_;
68         boost::filesystem::path                                                 folder_;
69         filesystem_event                                                                events_mask_;
70         filesystem_monitor_handler                                              handler_;
71         initial_files_handler                                                   initial_files_handler_;
72         bool                                                                                    first_scan_                                     = true;
73         std::map<boost::filesystem::path, std::time_t>  files_;
74         std::map<boost::filesystem::path, uintmax_t>    being_written_sizes_;
75 public:
76         directory_monitor(
77                         bool report_already_existing,
78                         const boost::filesystem::path& folder,
79                         filesystem_event events_mask,
80                         const filesystem_monitor_handler& handler,
81                         const initial_files_handler& initial_files_handler)
82                 : report_already_existing_(report_already_existing)
83                 , folder_(folder)
84                 , events_mask_(events_mask)
85                 , handler_(exception_protected_handler(handler))
86                 , initial_files_handler_(initial_files_handler)
87         {
88         }
89
90         void reemmit_all(const tbb::atomic<bool>& running)
91         {
92                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
93                         return;
94
95                 for (auto& file : files_)
96                 {
97                         if (!running)
98                                 return;
99
100                         handler_(filesystem_event::MODIFIED, file.first);
101                 }
102         }
103
104         void reemmit(const boost::filesystem::path& file)
105         {
106                 if (static_cast<int>(events_mask_ & filesystem_event::MODIFIED) == 0)
107                         return;
108
109                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))
110                         handler_(filesystem_event::MODIFIED, file);
111         }
112
113         void scan(const boost::function<bool ()>& should_abort)
114         {
115                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds
116                 using namespace boost::filesystem;
117
118                 bool interested_in_removed = static_cast<int>(events_mask_ & filesystem_event::REMOVED) > 0;
119                 bool interested_in_created = static_cast<int>(events_mask_ & filesystem_event::CREATED) > 0;
120                 bool interested_in_modified = static_cast<int>(events_mask_ & filesystem_event::MODIFIED) > 0;
121
122                 auto filenames = cpplinq::from(files_).select(keys());
123                 std::set<path> removed_files(filenames.begin(), filenames.end());
124                 std::set<path> initial_files;
125
126                 for (boost::filesystem::wrecursive_directory_iterator iter(folder_); iter != boost::filesystem::wrecursive_directory_iterator(); ++iter)
127                 {
128                         if (should_abort())
129                                 return;
130
131                         auto& path = iter->path();
132
133                         if (is_directory(path))
134                                 continue;
135
136                         auto now = std::time(nullptr);
137                         std::time_t current_mtime;
138
139                         try
140                         {
141                                 current_mtime = last_write_time(path);
142                         }
143                         catch (...)
144                         {
145                                 // Probably removed, will be captured the next round.
146                                 continue;
147                         }
148
149                         auto time_since_written_to = now - current_mtime;
150                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;
151                         auto previous_it = files_.find(path);
152                         bool already_known = previous_it != files_.end();
153
154                         if (already_known && no_longer_being_written_to)
155                         {
156                                 bool modified = previous_it->second != current_mtime;
157
158                                 if (modified && can_read_file(path))
159                                 {
160                                         if (interested_in_modified)
161                                                 handler_(filesystem_event::MODIFIED, path);
162
163                                         files_[path] = current_mtime;
164                                         being_written_sizes_.erase(path);
165                                 }
166                         }
167                         else if (no_longer_being_written_to && can_read_file(path))
168                         {
169                                 if (interested_in_created && (report_already_existing_ || !first_scan_))
170                                         handler_(filesystem_event::CREATED, path);
171
172                                 if (first_scan_)
173                                         initial_files.insert(path);
174
175                                 files_.insert(std::make_pair(path, current_mtime));
176                                 being_written_sizes_.erase(path);
177                         }
178
179                         removed_files.erase(path);
180                 }
181
182                 for (auto& path : removed_files)
183                 {
184                         files_.erase(path);
185                         being_written_sizes_.erase(path);
186
187                         if (interested_in_removed)
188                                 handler_(filesystem_event::REMOVED, path);
189                 }
190
191                 if (first_scan_)
192                         initial_files_handler_(initial_files);
193
194                 first_scan_ = false;
195         }
196 private:
197         bool can_read_file(const boost::filesystem::path& file)
198         {
199                 boost::filesystem::wifstream stream(file);
200
201                 return stream.is_open();
202         }
203 };
204
205 class polling_filesystem_monitor
206                 : public filesystem_monitor
207                 , public spl::enable_shared_from_this<polling_filesystem_monitor>
208 {
209         tbb::atomic<bool>                                                               running_;
210         std::shared_ptr<boost::asio::io_service>                scheduler_;
211         directory_monitor                                                               root_monitor_;
212         boost::asio::deadline_timer                                             timer_;
213         int                                                                                             scan_interval_millis_;
214         std::promise<void>                                                              initial_scan_completion_;
215         tbb::concurrent_queue<boost::filesystem::path>  to_reemmit_;
216         tbb::atomic<bool>                                                               reemmit_all_;
217         executor                                                                                executor_;
218 public:
219         polling_filesystem_monitor(
220                         const boost::filesystem::path& folder_to_watch,
221                         filesystem_event events_of_interest_mask,
222                         bool report_already_existing,
223                         int scan_interval_millis,
224                         std::shared_ptr<boost::asio::io_service> scheduler,
225                         const filesystem_monitor_handler& handler,
226                         const initial_files_handler& initial_files_handler)
227                 : scheduler_(std::move(scheduler))
228                 , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
229                 , timer_(*scheduler_)
230                 , scan_interval_millis_(scan_interval_millis)
231                 , executor_(L"polling_filesystem_monitor")
232         {
233                 running_ = true;
234                 reemmit_all_ = false;
235         }
236
237         void start()
238         {
239                 executor_.begin_invoke([this]
240                 {
241                         scan();
242                         initial_scan_completion_.set_value();
243                         schedule_next();
244                 });
245         }
246
247         ~polling_filesystem_monitor()
248         {
249                 running_ = false;
250                 boost::system::error_code e;
251                 timer_.cancel(e); // Can still have be queued for execution by asio, therefore the task has a weak_ptr to this
252         }
253
254         std::future<void> initial_files_processed() override
255         {
256                 return initial_scan_completion_.get_future();
257         }
258
259         void reemmit_all() override
260         {
261                 reemmit_all_ = true;
262         }
263
264         void reemmit(const boost::filesystem::path& file) override
265         {
266                 to_reemmit_.push(file);
267         }
268 private:
269         void schedule_next()
270         {
271                 if (!running_)
272                         return;
273
274                 std::weak_ptr<polling_filesystem_monitor> weak_self = shared_from_this();
275
276                 timer_.expires_from_now(boost::posix_time::milliseconds(scan_interval_millis_));
277                 timer_.async_wait([weak_self](const boost::system::error_code& e)
278                 {
279                         auto strong_self = weak_self.lock();
280
281                         if (strong_self)
282                                 strong_self->begin_scan();
283                 });
284         }
285
286         void begin_scan()
287         {
288                 if (!running_)
289                         return;
290
291                 executor_.begin_invoke([this]()
292                 {
293                         scan();
294                         schedule_next();
295                 });
296         }
297
298         void scan()
299         {
300                 if (!running_)
301                         return;
302
303                 try
304                 {
305                         if (reemmit_all_.fetch_and_store(false))
306                                 root_monitor_.reemmit_all(running_);
307                         else
308                         {
309                                 boost::filesystem::path file;
310
311                                 while (to_reemmit_.try_pop(file))
312                                         root_monitor_.reemmit(file);
313                         }
314
315                         root_monitor_.scan([=] { return !running_; });
316                 }
317                 catch (...)
318                 {
319                         CASPAR_LOG_CURRENT_EXCEPTION();
320                 }
321         }
322 };
323
324 struct polling_filesystem_monitor_factory::impl
325 {
326         std::shared_ptr<boost::asio::io_service> scheduler_;
327         int scan_interval_millis;
328
329         impl(
330                         std::shared_ptr<boost::asio::io_service> scheduler,
331                         int scan_interval_millis)
332                 : scheduler_(std::move(scheduler))
333                 , scan_interval_millis(scan_interval_millis)
334         {
335         }
336 };
337
338 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
339                 std::shared_ptr<boost::asio::io_service> scheduler,
340                 int scan_interval_millis)
341         : impl_(new impl(std::move(scheduler), scan_interval_millis))
342 {
343 }
344
345 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()
346 {
347 }
348
349 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(
350                 const boost::filesystem::path& folder_to_watch,
351                 filesystem_event events_of_interest_mask,
352                 bool report_already_existing,
353                 const filesystem_monitor_handler& handler,
354                 const initial_files_handler& initial_files_handler)
355 {
356         auto monitor = spl::make_shared<polling_filesystem_monitor>(
357                         folder_to_watch,
358                         events_of_interest_mask,
359                         report_already_existing,
360                         impl_->scan_interval_millis,
361                         impl_->scheduler_,
362                         handler,
363                         initial_files_handler);
364
365         monitor->start();
366
367         return monitor;
368 }
369
370 }