]> git.sesse.net Git - casparcg/blob - common/filesystem/polling_filesystem_monitor.cpp
Improved read_fps.
[casparcg] / common / filesystem / polling_filesystem_monitor.cpp
1 /*\r
2 * Copyright 2013 Sveriges Television AB http://casparcg.com/\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Helge Norberg, helge.norberg@svt.se\r
20 */\r
21 \r
22 #include "../stdafx.h"\r
23 \r
24 #include "polling_filesystem_monitor.h"\r
25 \r
26 #include <map>\r
27 #include <set>\r
28 #include <iostream>\r
29 \r
30 #include <boost/asio.hpp>\r
31 #include <boost/foreach.hpp>\r
32 #include <boost/range/adaptor/map.hpp>\r
33 #include <boost/range/algorithm/copy.hpp>\r
34 #include <boost/filesystem/fstream.hpp>\r
35 \r
36 #include <tbb/atomic.h>\r
37 #include <tbb/concurrent_queue.h>\r
38 \r
39 #include "../concurrency/executor.h"\r
40 \r
41 namespace caspar {\r
42 \r
43 class exception_protected_handler\r
44 {\r
45         filesystem_monitor_handler handler_;\r
46 public:\r
47         exception_protected_handler(const filesystem_monitor_handler& handler)\r
48                 : handler_(handler)\r
49         {\r
50         }\r
51 \r
52         void operator()(filesystem_event event, const boost::filesystem::wpath& file)\r
53         {\r
54                 try\r
55                 {\r
56                         handler_(event, file);\r
57                 }\r
58                 catch (...)\r
59                 {\r
60                         CASPAR_LOG_CURRENT_EXCEPTION();\r
61                 }\r
62         }\r
63 };\r
64 \r
65 class directory_monitor\r
66 {\r
67         bool report_already_existing_;\r
68         boost::filesystem::wpath folder_;\r
69         filesystem_event events_mask_;\r
70         filesystem_monitor_handler handler_;\r
71         initial_files_handler initial_files_handler_;\r
72         bool first_scan_;\r
73         std::map<boost::filesystem::wpath, std::time_t> files_;\r
74         std::map<boost::filesystem::wpath, uintmax_t> being_written_sizes_;\r
75 public:\r
76         directory_monitor(\r
77                         bool report_already_existing,\r
78                         const boost::filesystem::wpath& folder,\r
79                         filesystem_event events_mask,\r
80                         const filesystem_monitor_handler& handler,\r
81                         const initial_files_handler& initial_files_handler)\r
82                 : report_already_existing_(report_already_existing)\r
83                 , folder_(folder)\r
84                 , events_mask_(events_mask)\r
85                 , handler_(exception_protected_handler(handler))\r
86                 , initial_files_handler_(initial_files_handler)\r
87                 , first_scan_(true)\r
88         {\r
89         }\r
90 \r
91         void reemmit_all()\r
92         {\r
93                 if ((events_mask_ & MODIFIED) == 0)\r
94                         return;\r
95 \r
96                 BOOST_FOREACH(auto& file, files_)\r
97                         handler_(MODIFIED, file.first);\r
98         }\r
99 \r
100         void reemmit(const boost::filesystem::wpath& file)\r
101         {\r
102                 if ((events_mask_ & MODIFIED) == 0)\r
103                         return;\r
104 \r
105                 if (files_.find(file) != files_.end() && boost::filesystem::exists(file))\r
106                         handler_(MODIFIED, file);\r
107         }\r
108 \r
109         void scan(const boost::function<bool ()>& should_abort)\r
110         {\r
111                 static const std::time_t NO_LONGER_WRITING_AGE = 3; // Assume std::time_t is expressed in seconds\r
112                 using namespace boost::filesystem;\r
113 \r
114                 bool interested_in_removed = (events_mask_ & REMOVED) > 0;\r
115                 bool interested_in_created = (events_mask_ & CREATED) > 0;\r
116                 bool interested_in_modified = (events_mask_ & MODIFIED) > 0;\r
117 \r
118                 std::set<wpath> removed_files;\r
119                 boost::copy(\r
120                                 files_ | boost::adaptors::map_keys,\r
121                                 std::insert_iterator<decltype(removed_files)>(removed_files, removed_files.end()));\r
122 \r
123                 std::set<wpath> initial_files;\r
124 \r
125                 for (wrecursive_directory_iterator iter(folder_); iter != wrecursive_directory_iterator(); ++iter)\r
126                 {\r
127                         if (should_abort())\r
128                                 return;\r
129 \r
130                         auto& path = iter->path();\r
131 \r
132                         if (is_directory(path))\r
133                                 continue;\r
134 \r
135                         auto now = std::time(nullptr);\r
136                         std::time_t current_mtime;\r
137                         \r
138                         try\r
139                         {\r
140                                 current_mtime = last_write_time(path);\r
141                         }\r
142                         catch (...)\r
143                         {\r
144                                 // Probably removed, will be captured the next round.\r
145                                 continue;\r
146                         }\r
147 \r
148                         auto time_since_written_to = now - current_mtime;\r
149                         bool no_longer_being_written_to = time_since_written_to >= NO_LONGER_WRITING_AGE;\r
150                         auto previous_it = files_.find(path);\r
151                         bool already_known = previous_it != files_.end();\r
152 \r
153                         if (already_known && no_longer_being_written_to)\r
154                         {\r
155                                 bool modified = previous_it->second != current_mtime;\r
156 \r
157                                 if (modified && can_read_file(path))\r
158                                 {\r
159                                         if (interested_in_modified)\r
160                                                 handler_(MODIFIED, path);\r
161 \r
162                                         files_[path] = current_mtime;\r
163                                         being_written_sizes_.erase(path);\r
164                                 }\r
165                         }\r
166                         else if (no_longer_being_written_to && can_read_file(path))\r
167                         {\r
168                                 if (interested_in_created && (report_already_existing_ || !first_scan_))\r
169                                         handler_(CREATED, path);\r
170 \r
171                                 if (first_scan_)\r
172                                         initial_files.insert(path);\r
173 \r
174                                 files_.insert(std::make_pair(path, current_mtime));\r
175                                 being_written_sizes_.erase(path);\r
176                         }\r
177 \r
178                         removed_files.erase(path);\r
179                 }\r
180 \r
181                 BOOST_FOREACH(auto& path, removed_files)\r
182                 {\r
183                         files_.erase(path);\r
184                         being_written_sizes_.erase(path);\r
185 \r
186                         if (interested_in_removed)\r
187                                 handler_(REMOVED, path);\r
188                 }\r
189 \r
190                 if (first_scan_)\r
191                         initial_files_handler_(initial_files);\r
192 \r
193                 first_scan_ = false;\r
194         }\r
195 private:\r
196         bool can_read_file(const boost::filesystem::wpath& file)\r
197         {\r
198                 boost::filesystem::wifstream stream(file);\r
199 \r
200                 return stream.is_open();\r
201         }\r
202 };\r
203 \r
204 class polling_filesystem_monitor : public filesystem_monitor\r
205 {\r
206         directory_monitor root_monitor_;\r
207         executor executor_;\r
208         boost::asio::io_service& scheduler_;\r
209         boost::asio::deadline_timer timer_;\r
210         tbb::atomic<bool> running_;\r
211         int scan_interval_millis_;\r
212         boost::promise<void> initial_scan_completion_;\r
213         tbb::concurrent_queue<boost::filesystem::wpath> to_reemmit_;\r
214         tbb::atomic<bool> reemmit_all_;\r
215 public:\r
216         polling_filesystem_monitor(\r
217                         const boost::filesystem::wpath& folder_to_watch,\r
218                         filesystem_event events_of_interest_mask,\r
219                         bool report_already_existing,\r
220                         int scan_interval_millis,\r
221                         boost::asio::io_service& scheduler,\r
222                         const filesystem_monitor_handler& handler,\r
223                         const initial_files_handler& initial_files_handler)\r
224                 : root_monitor_(\r
225                                 report_already_existing,\r
226                                 folder_to_watch,\r
227                                 events_of_interest_mask,\r
228                                 handler,\r
229                                 initial_files_handler)\r
230                 , executor_(L"polling_filesystem_monitor")\r
231                 , scheduler_(scheduler)\r
232                 , timer_(scheduler)\r
233                 , scan_interval_millis_(scan_interval_millis)\r
234         {\r
235                 running_ = true;\r
236                 reemmit_all_ = false;\r
237                 executor_.begin_invoke([this]\r
238                 {\r
239                         scan();\r
240                         initial_scan_completion_.set_value();\r
241                         schedule_next();\r
242                 });\r
243         }\r
244 \r
245         virtual ~polling_filesystem_monitor()\r
246         {\r
247                 running_ = false;\r
248                 boost::system::error_code e;\r
249                 timer_.cancel(e);\r
250         }\r
251 \r
252         virtual boost::unique_future<void> initial_files_processed()\r
253         {\r
254                 return initial_scan_completion_.get_future();\r
255         }\r
256 \r
257         virtual void reemmit_all()\r
258         {\r
259                 reemmit_all_ = true;\r
260         }\r
261 \r
262         virtual void reemmit(const boost::filesystem::wpath& file)\r
263         {\r
264                 to_reemmit_.push(file);\r
265         }\r
266 private:\r
267         void schedule_next()\r
268         {\r
269                 if (!running_)\r
270                         return;\r
271 \r
272                 timer_.expires_from_now(\r
273                         boost::posix_time::milliseconds(scan_interval_millis_));\r
274                 timer_.async_wait([this](const boost::system::error_code& e)\r
275                 {\r
276                         scan();\r
277                         schedule_next();\r
278                 });\r
279         }\r
280 \r
281         void scan()\r
282         {\r
283                 if (!running_)\r
284                         return;\r
285 \r
286                 try\r
287                 {\r
288                         if (reemmit_all_.fetch_and_store(false))\r
289                                 root_monitor_.reemmit_all();\r
290                         else\r
291                         {\r
292                                 boost::filesystem::wpath file;\r
293 \r
294                                 while (to_reemmit_.try_pop(file))\r
295                                         root_monitor_.reemmit(file);\r
296                         }\r
297 \r
298                         root_monitor_.scan([=] { return !running_; });\r
299                 }\r
300                 catch (...)\r
301                 {\r
302                         CASPAR_LOG_CURRENT_EXCEPTION();\r
303                 }\r
304         }\r
305 };\r
306 \r
307 struct polling_filesystem_monitor_factory::implementation\r
308 {\r
309         boost::asio::io_service& scheduler_;\r
310         int scan_interval_millis;\r
311 \r
312         implementation(\r
313                         boost::asio::io_service& scheduler, int scan_interval_millis)\r
314                 : scheduler_(scheduler), scan_interval_millis(scan_interval_millis)\r
315         {\r
316         }\r
317 };\r
318 \r
319 polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(\r
320                 boost::asio::io_service& scheduler,\r
321                 int scan_interval_millis)\r
322         : impl_(new implementation(scheduler, scan_interval_millis))\r
323 {\r
324 }\r
325 \r
326 polling_filesystem_monitor_factory::~polling_filesystem_monitor_factory()\r
327 {\r
328 }\r
329 \r
330 filesystem_monitor::ptr polling_filesystem_monitor_factory::create(\r
331                 const boost::filesystem::wpath& folder_to_watch,\r
332                 filesystem_event events_of_interest_mask,\r
333                 bool report_already_existing,\r
334                 const filesystem_monitor_handler& handler,\r
335                 const initial_files_handler& initial_files_handler)\r
336 {\r
337         return make_safe<polling_filesystem_monitor>(\r
338                         folder_to_watch,\r
339                         events_of_interest_mask,\r
340                         report_already_existing,\r
341                         impl_->scan_interval_millis,\r
342                         impl_->scheduler_,\r
343                         handler,\r
344                         initial_files_handler);\r
345 }\r
346 \r
347 }\r