+#!/usr/bin/python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2023-2024 Oracle. All rights reserved.
+#
+# Author: Darrick J. Wong <djwong@kernel.org>
+
+# Run bcachefsck in parallel, but avoid thrashing.
+
+import subprocess
+import json
+import threading
+import time
+import sys
+import os
+import argparse
+import signal
+import dbus
+from io import TextIOWrapper
+from pathlib import Path
+from datetime import timedelta
+from datetime import datetime
+from datetime import timezone
+
+retcode = 0
+terminate = False
+debug = False
+
+def DEVNULL():
+ '''Return /dev/null in subprocess writable format.'''
+ try:
+ from subprocess import DEVNULL
+ return DEVNULL
+ except ImportError:
+ return open(os.devnull, 'wb')
+
+def find_mounts():
+ '''Map mountpoints to physical disks.'''
+ def find_bcachefs_mounts(bdev, fs, lastdisk):
+ '''Attach all lastdisk to each fs found under bdev.'''
+ if bdev['fstype'] == 'bcachefs' and bdev['mountpoint'] is not None:
+ mnt = bdev['mountpoint']
+ if mnt in fs:
+ fs[mnt].add(lastdisk.split(':'))
+ else:
+ fs[mnt] = set(lastdisk.split(':'))
+ if 'children' not in bdev:
+ return
+ for child in bdev['children']:
+ find_bcachefs_mounts(child, fs, lastdisk)
+
+ fs = {}
+ cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J']
+ result = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ result.wait()
+ if result.returncode != 0:
+ return fs
+ sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()]
+ output = ' '.join(sarray)
+ bdevdata = json.loads(output)
+
+ # The lsblk output had better be in disks-then-partitions order
+ for bdev in bdevdata['blockdevices']:
+ lastdisk = bdev['kname']
+ find_bcachefs_mounts(bdev, fs, lastdisk)
+
+ return fs
+
+def backtick(cmd):
+ '''Generator function that yields lines of a program's stdout.'''
+ p = subprocess.Popen(cmd, stdout = subprocess.PIPE)
+ for line in TextIOWrapper(p.stdout, encoding="utf-8"):
+ yield line.strip()
+
+def remove_killfunc(killfuncs, fn):
+ '''Ensure fn is not in killfuncs.'''
+ try:
+ killfuncs.remove(fn)
+ except:
+ pass
+
+class scrub_control(object):
+ '''Control object for bcachefsck.'''
+ def __init__(self):
+ pass
+
+ def start(self):
+ '''Start scrub and wait for it to complete. Returns -1 if the
+ service was not started, 0 if it succeeded, or 1 if it
+ failed.'''
+ assert False
+
+ def stop(self):
+ '''Stop scrub.'''
+ assert False
+
+class scrub_subprocess(scrub_control):
+ '''Control object for bcachefsck subprocesses.'''
+ def __init__(self, mnt):
+ cmd = ['bcachefs', 'fsck']
+ cmd += '@bcachefsck_args@'.split()
+ cmd += [mnt]
+ self.cmdline = cmd
+ self.proc = None
+
+ def start(self):
+ '''Start bcachefsck and wait for it to complete. Returns -1 if
+ the service was not started, 0 if it succeeded, or 1 if it
+ failed.'''
+ global debug
+
+ if debug:
+ print('run ', ' '.join(self.cmdline))
+
+ try:
+ self.proc = subprocess.Popen(self.cmdline)
+ self.proc.wait()
+ except:
+ return -1
+
+ proc = self.proc
+ self.proc = None
+ return proc.returncode
+
+ def stop(self):
+ '''Stop bcachefsck.'''
+ global debug
+
+ if debug:
+ print('kill ', ' '.join(self.cmdline))
+ if self.proc is not None:
+ self.proc.terminate()
+
+def run_subprocess(mnt, killfuncs):
+ '''Run a killable program. Returns program retcode or -1 if we can't
+ start it.'''
+ try:
+ p = scrub_subprocess(mnt)
+ killfuncs.add(p.stop)
+ ret = p.start()
+ remove_killfunc(killfuncs, p.stop)
+ return ret
+ except:
+ return -1
+
+# systemd doesn't like unit instance names with slashes in them, so it
+# replaces them with dashes when it invokes the service. Filesystem paths
+# need a special --path argument so that dashes do not get mangled.
+def path_to_serviceunit(path):
+ '''Convert a pathname into a systemd service unit name.'''
+
+ svcname = 'bcachefsck@.service'
+ cmd = ['systemd-escape', '--template', svcname, '--path', path]
+
+ proc = subprocess.Popen(cmd, stdout = subprocess.PIPE)
+ proc.wait()
+ for line in proc.stdout:
+ return line.decode(sys.stdout.encoding).strip()
+
+def fibonacci(max_ret):
+ '''Yield fibonacci sequence up to but not including max_ret.'''
+ if max_ret < 1:
+ return
+
+ x = 0
+ y = 1
+ yield 1
+
+ z = x + y
+ while z <= max_ret:
+ yield z
+ x = y
+ y = z
+ z = x + y
+
+class scrub_service(scrub_control):
+ '''Control object for bcachefsck systemd service.'''
+ def __init__(self, mnt):
+ self.unitname = path_to_serviceunit(mnt)
+ self.prop = None
+ self.unit = None
+ self.bind()
+
+ def bind(self):
+ '''Bind to the dbus proxy object for this service.'''
+ sysbus = dbus.SystemBus()
+ systemd1 = sysbus.get_object('org.freedesktop.systemd1',
+ '/org/freedesktop/systemd1')
+ manager = dbus.Interface(systemd1,
+ 'org.freedesktop.systemd1.Manager')
+ path = manager.LoadUnit(self.unitname)
+
+ svc_obj = sysbus.get_object('org.freedesktop.systemd1', path)
+ self.prop = dbus.Interface(svc_obj,
+ 'org.freedesktop.DBus.Properties')
+ self.unit = dbus.Interface(svc_obj,
+ 'org.freedesktop.systemd1.Unit')
+
+ def __dbusrun(self, lambda_fn):
+ '''Call the lambda function to execute something on dbus. dbus
+ exceptions result in retries with Fibonacci backoff, and the
+ bindings will be rebuilt every time.'''
+ global debug
+
+ fatal_ex = None
+
+ for i in fibonacci(30):
+ try:
+ return lambda_fn()
+ except dbus.exceptions.DBusException as e:
+ if debug:
+ print(e)
+ fatal_ex = e
+ time.sleep(i)
+ self.bind()
+ raise fatal_ex
+
+ def state(self):
+ '''Retrieve the active state for a systemd service. As of
+ systemd 249, this is supposed to be one of the following:
+ "active", "reloading", "inactive", "failed", "activating",
+ or "deactivating". These strings are not localized.'''
+ global debug
+
+ l = lambda: self.prop.Get('org.freedesktop.systemd1.Unit',
+ 'ActiveState')
+ try:
+ return self.__dbusrun(l)
+ except Exception as e:
+ if debug:
+ print(e, file = sys.stderr)
+ return 'failed'
+
+ def wait(self, interval = 1):
+ '''Wait until the service finishes.'''
+ global debug
+
+ # Use a poll/sleep loop to wait for the service to finish.
+ # Avoid adding a dependency on python3 glib, which is required
+ # to use an event loop to receive a dbus signal.
+ s = self.state()
+ while s not in ['failed', 'inactive']:
+ if debug:
+ print('waiting %s %s' % (self.unitname, s))
+ time.sleep(interval)
+ s = self.state()
+ if debug:
+ print('waited %s %s' % (self.unitname, s))
+ if s == 'failed':
+ return 1
+ return 0
+
+ def start(self):
+ '''Start the service and wait for it to complete. Returns -1
+ if the service was not started, 0 if it succeeded, or 1 if it
+ failed.'''
+ global debug
+
+ if debug:
+ print('starting %s' % self.unitname)
+
+ try:
+ self.__dbusrun(lambda: self.unit.Start('replace'))
+ return self.wait()
+ except Exception as e:
+ print(e, file = sys.stderr)
+ return -1
+
+ def stop(self):
+ '''Stop the service.'''
+ global debug
+
+ if debug:
+ print('stopping %s' % self.unitname)
+
+ try:
+ self.__dbusrun(lambda: self.unit.Stop('replace'))
+ return self.wait()
+ except Exception as e:
+ print(e, file = sys.stderr)
+ return -1
+
+def run_service(mnt, killfuncs):
+ '''Run scrub as a service.'''
+ try:
+ svc = scrub_service(mnt)
+ except:
+ return -1
+
+ killfuncs.add(svc.stop)
+ retcode = svc.start()
+ remove_killfunc(killfuncs, svc.stop)
+ return retcode
+
+def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs):
+ '''Run a scrub process.'''
+ global retcode, terminate
+
+ print("Scrubbing %s..." % mnt)
+ sys.stdout.flush()
+
+ try:
+ if terminate:
+ return
+
+ # Run per-mount systemd bcachefsck service only if we ourselves
+ # are running as a systemd service.
+ if 'SERVICE_MODE' in os.environ:
+ ret = run_service(mnt, killfuncs)
+ if ret == 0 or ret == 1:
+ print("Scrubbing %s done, (err=%d)" % (mnt, ret))
+ sys.stdout.flush()
+ retcode |= ret
+ return
+
+ if terminate:
+ return
+
+ # Invoke bcachefsck manually if we're running in the foreground.
+ # We also permit this if we're running as a cronjob where
+ # systemd services are unavailable.
+ ret = run_subprocess(mnt, killfuncs)
+ if ret >= 0:
+ print("Scrubbing %s done, (err=%d)" % (mnt, ret))
+ sys.stdout.flush()
+ retcode |= ret
+ return
+
+ if terminate:
+ return
+
+ print("Unable to start scrub tool.")
+ sys.stdout.flush()
+ finally:
+ running_devs -= mntdevs
+ cond.acquire()
+ cond.notify()
+ cond.release()
+
+def signal_scrubs(signum, cond):
+ '''Handle termination signals by killing bcachefsck children.'''
+ global debug, terminate
+
+ if debug:
+ print('Signal handler called with signal', signum)
+ sys.stdout.flush()
+
+ terminate = True
+ cond.acquire()
+ cond.notify()
+ cond.release()
+
+def wait_for_termination(cond, killfuncs):
+ '''Wait for a child thread to terminate. Returns True if we should
+ abort the program, False otherwise.'''
+ global debug, terminate
+
+ if debug:
+ print('waiting for threads to terminate')
+ sys.stdout.flush()
+
+ cond.acquire()
+ try:
+ cond.wait()
+ except KeyboardInterrupt:
+ terminate = True
+ cond.release()
+
+ if not terminate:
+ return False
+
+ print("Terminating...")
+ sys.stdout.flush()
+ while len(killfuncs) > 0:
+ fn = killfuncs.pop()
+ fn()
+ return True
+
+def scan_interval(string):
+ '''Convert a textual scan interval argument into a time delta.'''
+
+ if string.endswith('y'):
+ year = timedelta(seconds = 31556952)
+ return year * float(string[:-1])
+ if string.endswith('q'):
+ return timedelta(days = 90 * float(string[:-1]))
+ if string.endswith('mo'):
+ return timedelta(days = 30 * float(string[:-2]))
+ if string.endswith('w'):
+ return timedelta(weeks = float(string[:-1]))
+ if string.endswith('d'):
+ return timedelta(days = float(string[:-1]))
+ if string.endswith('h'):
+ return timedelta(hours = float(string[:-1]))
+ if string.endswith('m'):
+ return timedelta(minutes = float(string[:-1]))
+ if string.endswith('s'):
+ return timedelta(seconds = float(string[:-1]))
+ return timedelta(seconds = int(string))
+
+def utcnow():
+ '''Create a representation of the time right now, in UTC.'''
+
+ dt = datetime.utcnow()
+ return dt.replace(tzinfo = timezone.utc)
+
+def main():
+ '''Find mounts, schedule bcachefsck runs.'''
+ def thr(mnt, devs):
+ a = (mnt, cond, running_devs, devs, killfuncs)
+ thr = threading.Thread(target = run_scrub, args = a)
+ thr.start()
+ global retcode, terminate, debug
+
+ parser = argparse.ArgumentParser( \
+ description = "Scrub all mounted bcachefs filesystems.")
+ parser.add_argument("--debug", help = "Enabling debugging messages.", \
+ action = "store_true")
+ args = parser.parse_args()
+
+ if args.debug:
+ debug = True
+
+ fs = find_mounts()
+
+ # Schedule scrub jobs...
+ running_devs = set()
+ killfuncs = set()
+ cond = threading.Condition()
+
+ signal.signal(signal.SIGINT, lambda s, f: signal_scrubs(s, cond))
+ signal.signal(signal.SIGTERM, lambda s, f: signal_scrubs(s, cond))
+
+ while len(fs) > 0:
+ if len(running_devs) == 0:
+ mnt, devs = fs.popitem()
+ running_devs.update(devs)
+ thr(mnt, devs)
+ poppers = set()
+ for mnt in fs:
+ devs = fs[mnt]
+ can_run = True
+ for dev in devs:
+ if dev in running_devs:
+ can_run = False
+ break
+ if can_run:
+ running_devs.update(devs)
+ poppers.add(mnt)
+ thr(mnt, devs)
+ for p in poppers:
+ fs.pop(p)
+
+ # Wait for one thread to finish
+ if wait_for_termination(cond, killfuncs):
+ break
+
+ # Wait for the rest of the threads to finish
+ while len(killfuncs) > 0:
+ wait_for_termination(cond, killfuncs)
+
+ # If we're being run as a service, the return code must fit the LSB
+ # init script action error guidelines, which is to say that we compress
+ # all errors to 1 ("generic or unspecified error", LSB 5.0 section
+ # 22.2) and hope the admin will scan the log for what actually
+ # happened.
+ #
+ # We have to sleep 2 seconds here because journald uses the pid to
+ # connect our log messages to the systemd service. This is critical
+ # for capturing all the log messages if the scrub fails, because the
+ # fail service uses the service name to gather log messages for the
+ # error report.
+ if 'SERVICE_MODE' in os.environ:
+ time.sleep(2)
+ if retcode != 0:
+ retcode = 1
+
+ sys.exit(retcode)
+
+if __name__ == '__main__':
+ main()