]> git.sesse.net Git - bcachefs-tools-debian/blob - fsck/bcachefsck_all.in
use upstream bindgen; fix packed and aligned types
[bcachefs-tools-debian] / fsck / bcachefsck_all.in
1 #!/usr/bin/python3
2
3 # SPDX-License-Identifier: GPL-2.0-or-later
4 # Copyright (C) 2023-2024 Oracle.  All rights reserved.
5 #
6 # Author: Darrick J. Wong <djwong@kernel.org>
7
8 # Run bcachefsck in parallel, but avoid thrashing.
9
10 import subprocess
11 import json
12 import threading
13 import time
14 import sys
15 import os
16 import argparse
17 import signal
18 import dbus
19 from io import TextIOWrapper
20 from pathlib import Path
21 from datetime import timedelta
22 from datetime import datetime
23 from datetime import timezone
24
25 retcode = 0
26 terminate = False
27 debug = False
28
29 def DEVNULL():
30         '''Return /dev/null in subprocess writable format.'''
31         try:
32                 from subprocess import DEVNULL
33                 return DEVNULL
34         except ImportError:
35                 return open(os.devnull, 'wb')
36
37 def find_mounts():
38         '''Map mountpoints to physical disks.'''
39         def find_bcachefs_mounts(bdev, fs, lastdisk):
40                 '''Attach all lastdisk to each fs found under bdev.'''
41                 if bdev['fstype'] == 'bcachefs' and bdev['mountpoint'] is not None:
42                         mnt = bdev['mountpoint']
43                         if mnt in fs:
44                                 fs[mnt].add(lastdisk.split(':'))
45                         else:
46                                 fs[mnt] = set(lastdisk.split(':'))
47                 if 'children' not in bdev:
48                         return
49                 for child in bdev['children']:
50                         find_bcachefs_mounts(child, fs, lastdisk)
51
52         fs = {}
53         cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J']
54         result = subprocess.Popen(cmd, stdout=subprocess.PIPE)
55         result.wait()
56         if result.returncode != 0:
57                 return fs
58         sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()]
59         output = ' '.join(sarray)
60         bdevdata = json.loads(output)
61
62         # The lsblk output had better be in disks-then-partitions order
63         for bdev in bdevdata['blockdevices']:
64                 lastdisk = bdev['kname']
65                 find_bcachefs_mounts(bdev, fs, lastdisk)
66
67         return fs
68
69 def backtick(cmd):
70         '''Generator function that yields lines of a program's stdout.'''
71         p = subprocess.Popen(cmd, stdout = subprocess.PIPE)
72         for line in TextIOWrapper(p.stdout, encoding="utf-8"):
73                 yield line.strip()
74
75 def remove_killfunc(killfuncs, fn):
76         '''Ensure fn is not in killfuncs.'''
77         try:
78                 killfuncs.remove(fn)
79         except:
80                 pass
81
82 class scrub_control(object):
83         '''Control object for bcachefsck.'''
84         def __init__(self):
85                 pass
86
87         def start(self):
88                 '''Start scrub and wait for it to complete.  Returns -1 if the
89                 service was not started, 0 if it succeeded, or 1 if it
90                 failed.'''
91                 assert False
92
93         def stop(self):
94                 '''Stop scrub.'''
95                 assert False
96
97 class scrub_subprocess(scrub_control):
98         '''Control object for bcachefsck subprocesses.'''
99         def __init__(self, mnt):
100                 cmd = ['bcachefs', 'fsck']
101                 cmd += '@bcachefsck_args@'.split()
102                 cmd += [mnt]
103                 self.cmdline = cmd
104                 self.proc = None
105
106         def start(self):
107                 '''Start bcachefsck and wait for it to complete.  Returns -1 if
108                 the service was not started, 0 if it succeeded, or 1 if it
109                 failed.'''
110                 global debug
111
112                 if debug:
113                         print('run ', ' '.join(self.cmdline))
114
115                 try:
116                         self.proc = subprocess.Popen(self.cmdline)
117                         self.proc.wait()
118                 except:
119                         return -1
120
121                 proc = self.proc
122                 self.proc = None
123                 return proc.returncode
124
125         def stop(self):
126                 '''Stop bcachefsck.'''
127                 global debug
128
129                 if debug:
130                         print('kill ', ' '.join(self.cmdline))
131                 if self.proc is not None:
132                         self.proc.terminate()
133
134 def run_subprocess(mnt, killfuncs):
135         '''Run a killable program.  Returns program retcode or -1 if we can't
136         start it.'''
137         try:
138                 p = scrub_subprocess(mnt)
139                 killfuncs.add(p.stop)
140                 ret = p.start()
141                 remove_killfunc(killfuncs, p.stop)
142                 return ret
143         except:
144                 return -1
145
146 # systemd doesn't like unit instance names with slashes in them, so it
147 # replaces them with dashes when it invokes the service.  Filesystem paths
148 # need a special --path argument so that dashes do not get mangled.
149 def path_to_serviceunit(path):
150         '''Convert a pathname into a systemd service unit name.'''
151
152         svcname = 'bcachefsck@.service'
153         cmd = ['systemd-escape', '--template', svcname, '--path', path]
154
155         proc = subprocess.Popen(cmd, stdout = subprocess.PIPE)
156         proc.wait()
157         for line in proc.stdout:
158                 return line.decode(sys.stdout.encoding).strip()
159
160 def fibonacci(max_ret):
161         '''Yield fibonacci sequence up to but not including max_ret.'''
162         if max_ret < 1:
163                 return
164
165         x = 0
166         y = 1
167         yield 1
168
169         z = x + y
170         while z <= max_ret:
171                 yield z
172                 x = y
173                 y = z
174                 z = x + y
175
176 class scrub_service(scrub_control):
177         '''Control object for bcachefsck systemd service.'''
178         def __init__(self, mnt):
179                 self.unitname = path_to_serviceunit(mnt)
180                 self.prop = None
181                 self.unit = None
182                 self.bind()
183
184         def bind(self):
185                 '''Bind to the dbus proxy object for this service.'''
186                 sysbus = dbus.SystemBus()
187                 systemd1 = sysbus.get_object('org.freedesktop.systemd1',
188                                             '/org/freedesktop/systemd1')
189                 manager = dbus.Interface(systemd1,
190                                 'org.freedesktop.systemd1.Manager')
191                 path = manager.LoadUnit(self.unitname)
192
193                 svc_obj = sysbus.get_object('org.freedesktop.systemd1', path)
194                 self.prop = dbus.Interface(svc_obj,
195                                 'org.freedesktop.DBus.Properties')
196                 self.unit = dbus.Interface(svc_obj,
197                                 'org.freedesktop.systemd1.Unit')
198
199         def __dbusrun(self, lambda_fn):
200                 '''Call the lambda function to execute something on dbus.  dbus
201                 exceptions result in retries with Fibonacci backoff, and the
202                 bindings will be rebuilt every time.'''
203                 global debug
204
205                 fatal_ex = None
206
207                 for i in fibonacci(30):
208                         try:
209                                 return lambda_fn()
210                         except dbus.exceptions.DBusException as e:
211                                 if debug:
212                                         print(e)
213                                 fatal_ex = e
214                                 time.sleep(i)
215                                 self.bind()
216                 raise fatal_ex
217
218         def state(self):
219                 '''Retrieve the active state for a systemd service.  As of
220                 systemd 249, this is supposed to be one of the following:
221                 "active", "reloading", "inactive", "failed", "activating",
222                 or "deactivating".  These strings are not localized.'''
223                 global debug
224
225                 l = lambda: self.prop.Get('org.freedesktop.systemd1.Unit',
226                                 'ActiveState')
227                 try:
228                         return self.__dbusrun(l)
229                 except Exception as e:
230                         if debug:
231                                 print(e, file = sys.stderr)
232                         return 'failed'
233
234         def wait(self, interval = 1):
235                 '''Wait until the service finishes.'''
236                 global debug
237
238                 # Use a poll/sleep loop to wait for the service to finish.
239                 # Avoid adding a dependency on python3 glib, which is required
240                 # to use an event loop to receive a dbus signal.
241                 s = self.state()
242                 while s not in ['failed', 'inactive']:
243                         if debug:
244                                 print('waiting %s %s' % (self.unitname, s))
245                         time.sleep(interval)
246                         s = self.state()
247                 if debug:
248                         print('waited %s %s' % (self.unitname, s))
249                 if s == 'failed':
250                         return 1
251                 return 0
252
253         def start(self):
254                 '''Start the service and wait for it to complete.  Returns -1
255                 if the service was not started, 0 if it succeeded, or 1 if it
256                 failed.'''
257                 global debug
258
259                 if debug:
260                         print('starting %s' % self.unitname)
261
262                 try:
263                         self.__dbusrun(lambda: self.unit.Start('replace'))
264                         return self.wait()
265                 except Exception as e:
266                         print(e, file = sys.stderr)
267                         return -1
268
269         def stop(self):
270                 '''Stop the service.'''
271                 global debug
272
273                 if debug:
274                         print('stopping %s' % self.unitname)
275
276                 try:
277                         self.__dbusrun(lambda: self.unit.Stop('replace'))
278                         return self.wait()
279                 except Exception as e:
280                         print(e, file = sys.stderr)
281                         return -1
282
283 def run_service(mnt, killfuncs):
284         '''Run scrub as a service.'''
285         try:
286                 svc = scrub_service(mnt)
287         except:
288                 return -1
289
290         killfuncs.add(svc.stop)
291         retcode = svc.start()
292         remove_killfunc(killfuncs, svc.stop)
293         return retcode
294
295 def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs):
296         '''Run a scrub process.'''
297         global retcode, terminate
298
299         print("Scrubbing %s..." % mnt)
300         sys.stdout.flush()
301
302         try:
303                 if terminate:
304                         return
305
306                 # Run per-mount systemd bcachefsck service only if we ourselves
307                 # are running as a systemd service.
308                 if 'SERVICE_MODE' in os.environ:
309                         ret = run_service(mnt, killfuncs)
310                         if ret == 0 or ret == 1:
311                                 print("Scrubbing %s done, (err=%d)" % (mnt, ret))
312                                 sys.stdout.flush()
313                                 retcode |= ret
314                                 return
315
316                         if terminate:
317                                 return
318
319                 # Invoke bcachefsck manually if we're running in the foreground.
320                 # We also permit this if we're running as a cronjob where
321                 # systemd services are unavailable.
322                 ret = run_subprocess(mnt, killfuncs)
323                 if ret >= 0:
324                         print("Scrubbing %s done, (err=%d)" % (mnt, ret))
325                         sys.stdout.flush()
326                         retcode |= ret
327                         return
328
329                 if terminate:
330                         return
331
332                 print("Unable to start scrub tool.")
333                 sys.stdout.flush()
334         finally:
335                 running_devs -= mntdevs
336                 cond.acquire()
337                 cond.notify()
338                 cond.release()
339
340 def signal_scrubs(signum, cond):
341         '''Handle termination signals by killing bcachefsck children.'''
342         global debug, terminate
343
344         if debug:
345                 print('Signal handler called with signal', signum)
346                 sys.stdout.flush()
347
348         terminate = True
349         cond.acquire()
350         cond.notify()
351         cond.release()
352
353 def wait_for_termination(cond, killfuncs):
354         '''Wait for a child thread to terminate.  Returns True if we should
355         abort the program, False otherwise.'''
356         global debug, terminate
357
358         if debug:
359                 print('waiting for threads to terminate')
360                 sys.stdout.flush()
361
362         cond.acquire()
363         try:
364                 cond.wait()
365         except KeyboardInterrupt:
366                 terminate = True
367         cond.release()
368
369         if not terminate:
370                 return False
371
372         print("Terminating...")
373         sys.stdout.flush()
374         while len(killfuncs) > 0:
375                 fn = killfuncs.pop()
376                 fn()
377         return True
378
379 def scan_interval(string):
380         '''Convert a textual scan interval argument into a time delta.'''
381
382         if string.endswith('y'):
383                 year = timedelta(seconds = 31556952)
384                 return year * float(string[:-1])
385         if string.endswith('q'):
386                 return timedelta(days = 90 * float(string[:-1]))
387         if string.endswith('mo'):
388                 return timedelta(days = 30 * float(string[:-2]))
389         if string.endswith('w'):
390                 return timedelta(weeks = float(string[:-1]))
391         if string.endswith('d'):
392                 return timedelta(days = float(string[:-1]))
393         if string.endswith('h'):
394                 return timedelta(hours = float(string[:-1]))
395         if string.endswith('m'):
396                 return timedelta(minutes = float(string[:-1]))
397         if string.endswith('s'):
398                 return timedelta(seconds = float(string[:-1]))
399         return timedelta(seconds = int(string))
400
401 def utcnow():
402         '''Create a representation of the time right now, in UTC.'''
403
404         dt = datetime.utcnow()
405         return dt.replace(tzinfo = timezone.utc)
406
407 def main():
408         '''Find mounts, schedule bcachefsck runs.'''
409         def thr(mnt, devs):
410                 a = (mnt, cond, running_devs, devs, killfuncs)
411                 thr = threading.Thread(target = run_scrub, args = a)
412                 thr.start()
413         global retcode, terminate, debug
414
415         parser = argparse.ArgumentParser( \
416                         description = "Scrub all mounted bcachefs filesystems.")
417         parser.add_argument("--debug", help = "Enabling debugging messages.", \
418                         action = "store_true")
419         args = parser.parse_args()
420
421         if args.debug:
422                 debug = True
423
424         fs = find_mounts()
425
426         # Schedule scrub jobs...
427         running_devs = set()
428         killfuncs = set()
429         cond = threading.Condition()
430
431         signal.signal(signal.SIGINT, lambda s, f: signal_scrubs(s, cond))
432         signal.signal(signal.SIGTERM, lambda s, f: signal_scrubs(s, cond))
433
434         while len(fs) > 0:
435                 if len(running_devs) == 0:
436                         mnt, devs = fs.popitem()
437                         running_devs.update(devs)
438                         thr(mnt, devs)
439                 poppers = set()
440                 for mnt in fs:
441                         devs = fs[mnt]
442                         can_run = True
443                         for dev in devs:
444                                 if dev in running_devs:
445                                         can_run = False
446                                         break
447                         if can_run:
448                                 running_devs.update(devs)
449                                 poppers.add(mnt)
450                                 thr(mnt, devs)
451                 for p in poppers:
452                         fs.pop(p)
453
454                 # Wait for one thread to finish
455                 if wait_for_termination(cond, killfuncs):
456                         break
457
458         # Wait for the rest of the threads to finish
459         while len(killfuncs) > 0:
460                 wait_for_termination(cond, killfuncs)
461
462         # If we're being run as a service, the return code must fit the LSB
463         # init script action error guidelines, which is to say that we compress
464         # all errors to 1 ("generic or unspecified error", LSB 5.0 section
465         # 22.2) and hope the admin will scan the log for what actually
466         # happened.
467         #
468         # We have to sleep 2 seconds here because journald uses the pid to
469         # connect our log messages to the systemd service.  This is critical
470         # for capturing all the log messages if the scrub fails, because the
471         # fail service uses the service name to gather log messages for the
472         # error report.
473         if 'SERVICE_MODE' in os.environ:
474                 time.sleep(2)
475                 if retcode != 0:
476                         retcode = 1
477
478         sys.exit(retcode)
479
480 if __name__ == '__main__':
481         main()