Skip to content

Instantly share code, notes, and snippets.

@ed-davies
Created September 16, 2024 10:31
Show Gist options
  • Save ed-davies/e083a956299725b29363d79905d9d421 to your computer and use it in GitHub Desktop.
Save ed-davies/e083a956299725b29363d79905d9d421 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# snapshot
"""
Backup a set of directories on to a new sub-directory of a specified
destination directory, hard linking files which are duplicates of files
with the same path and contents in a previous such backup.
"""
import sys
import os
from os import path
import subprocess
import platform
import argparse
import datetime
import json
class SnapshotError(Exception):
pass
def massage_mountpoint_lists(devices):
""" Create a mountpoints field, containing a list of zero or more
mountpoints.
Some versions of lsblk only return a 'mountpoint' field containing a
single mountpoint so we need to make that into a list for consistency.
Later versions return a 'mountpoints' field containing one or more
mountpoints. If there are no mountpoints for a device they return a
list containing a null/None value rather than an empty list which is
a bit silly - we turn it into an empty list.
"""
for device in devices:
if 'mountpoint' in device:
mp = device['mountpoint']
device.pop('mountpoint')
device['mountpoints'] = [mp] if mp != None else []
else:
if (len(device['mountpoints']) == 1) and (device['mountpoints'][0] == None):
device['mountpoints'] = []
assert all(isinstance(mp, str) and (len(mp) > 0) for mp in device['mountpoints'])
if 'children' in device:
massage_mountpoint_lists(device['children'])
def lsblk(options, label_fields, uuid_fields):
""" Return a tree containing information on the devices connected
to the system relevant to finding mountpoints from the --label
and --uuid options.
"""
mountpoint_options = ['mountpoints', 'mountpoint']
for mountpoint_option in mountpoint_options:
cmd = ['lsblk',
'--json',
'--output', ','.join([
'name',
'path',
*label_fields,
*uuid_fields,
mountpoint_option])
]
if options.verbose:
print(' '.join(cmd), file=sys.stderr)
cp = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if cp.returncode != 0:
stderr = cp.stderr.decode()
if ((cp.returncode == 1) and
(mountpoint_option != mountpoint_options[-1]) and
('mountpoints' in stderr)):
# Assume a version of lsblk which doesn't support mountpoints (plural).
continue
else:
print(stderr, file=sys.stderr)
raise SnapshotError(
f'Command {" ".join(cmd)} failed with return code {cp.returncode}')
devices = json.loads(cp.stdout)['blockdevices']
assert isinstance(devices, list)
massage_mountpoint_lists(devices)
return devices
assert False
class DeviceMatch:
""" Information on a match of a --label or --uuid option to a
device as found by lsblk.
"""
def __init__(self, device, field):
self.device = device
self.field = field
def device_mountpoints(device):
""" Yield the mountpoints of the specified device and its children.
"""
for mp in device['mountpoints']:
yield mp
if 'children' in device:
for child in device['children']:
yield from device_mountpoints(child)
def find_device_matches(device_tree, value, field_list):
""" Yield DeviceMatch objects for all devices in the device tree
which match the specified value of any of the specified fields.
"""
for device in device_tree:
for field in field_list:
if (field in device) and (device[field] == value):
yield DeviceMatch(device, field)
if 'children' in device:
yield from find_device_matches(device['children'], value, field_list)
def find_mountpoint(options):
""" Find the mountpoint specified by the --label or --uuid options.
Multiple instances of these options can be specified but from
the devices actually mounted they should imply in a single
unambiguous mount point.
E.g., you could have two disks with the labels 'snapshot1' and
'snapshot2'. Specifying both these labels on the command line
would be OK as long as only one of them is mounted. Typically
this might be used by an overnight cronjob which backs up to
whichever of these is left plugged in that night with the other
being left in a safe place elsewhere.
The labels and uuids can be on the device, the partition table
or the partition in question so long as there's a single
mountpoint implied.
"""
label_fields = ['label', 'partlabel']
uuid_fields = ['uuid', 'partuuid', 'ptuuid']
device_tree = lsblk(options, label_fields, uuid_fields)
device_matches = []
for label in options.labels:
device_matches += find_device_matches(device_tree, label, label_fields)
for uuid in options.uuids:
device_matches += find_device_matches(device_tree, uuid, uuid_fields)
if len(device_matches) == 0:
raise SnapshotError('No devices match specified --label or --uuid options.')
mountpoints = {mp for dm in device_matches for mp in device_mountpoints(dm.device)}
if len(mountpoints) == 1:
return mountpoints.pop()
else:
paths = sorted({dm.device['path'] for dm in device_matches})
for device in (dm.device
for path in paths
for dm in device_matches
if dm.device['path'] == path):
print(device['path'], file=sys.stderr)
for dm in device_matches:
if dm.device == device:
print(' Match:', dm.field, '=', device[dm.field],
file=sys.stderr)
mps = sorted({mp for mp in device_mountpoints(device)})
if len(mps) == 0:
print(' No mount points', file=sys.stderr)
else:
for mp in mps:
print(' Mount point:', mp, file=sys.stderr)
if len(mountpoints) == 0:
raise SnapshotError('No mount points for matching devices')
else:
raise SnapshotError('Multiple distinct mount points for matching devices')
assert False
def parse_options(args):
default_sources = ['/home', '/etc', '/usr/local']
default_stamp = (
datetime.datetime.utcnow()
.replace(microsecond=0)
.isoformat() + 'Z'
)
default_machine = platform.uname().node
parser = argparse.ArgumentParser(
description=__doc__
)
parser.add_argument(
'--destination', '-d',
help=f'directory where this snapshot is to be placed')
parser.add_argument(
'--label', '-l',
action='append',
dest='labels',
default=[],
help='label(s) of disk or partition to copy to')
parser.add_argument(
'--uuid',
action='append',
dest='uuids',
default=[],
help='uuid(s) of disk or partition to copy to')
parser.add_argument(
'--source', '-s',
metavar='DIR',
action='append',
dest='sources',
default=[],
help=f'directory to be copied ({", ".join(default_sources)})')
parser.add_argument(
'--machine',
default=default_machine,
help=f'machine name ({default_machine})')
parser.add_argument(
'--stamp',
default=default_stamp,
help=f'timestamp to name the directory copied to ({default_stamp})')
parser.add_argument(
'--unmount', '--umount', '-u',
action='store_true',
help='unmount destination when done')
parser.add_argument(
'--dry-run', '--dryrun', '-n',
action='store_true',
help='print the actions which would be performed but don\'t actually do them')
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='list what we\'re upto as we\'re about to do it')
options = parser.parse_args(args)
if (len(options.labels) > 0) or (len(options.uuids) > 0):
if options.destination != None:
raise SnapshotError('--destination specified with --label or --uuid')
options.destination = find_mountpoint(options)
if options.destination == None:
raise SnapshotError('No destination specified, needs --destination, --label or --uuid')
options.destination = path.realpath(path.expanduser(options.destination))
if len(options.sources) == 0:
options.sources = default_sources
options.sources = [path.join(path.abspath(path.realpath(path.expanduser(s))), '')
for s in options.sources]
for src in options.sources:
if not src.startswith('/'): # Fuck Windows.
raise SnapshotError(f'Source {src} doesn\'t start with a \'/\'')
if not src.endswith('/'):
raise SnapshotError(f'Source {src} doesn\'t end with a \'/\'')
if not path.isdir(src):
raise SnapshotError(f'Source {src} is not a directory')
if options.dry_run:
options.verbose = True
return options
def main(options):
target = path.join(options.destination, options.machine)
zero_dir = path.join(target, '0')
current = path.join(target, 'current')
# Create/check destination directory structure as required.
pretend_created_current = False
if path.lexists(target):
if not path.isdir(target):
raise SnapshotError(f'File system object {target} exists but is not a directory')
else:
if options.verbose:
print(f'Creating empty base directory {zero_dir}', file=sys.stderr)
if not options.dry_run:
os.makedirs(zero_dir)
if options.verbose:
print(f'Creating symlink {current}', file=sys.stderr)
if options.dry_run:
pretend_created_current = True
else:
os.symlink('0', current)
if not path.lexists(current):
if not pretend_created_current:
raise SnapshotError(f'Symlink {current} doesn\'t exist')
else:
if not path.islink(current):
raise SnapshotError(f'File system object {current} exists but isn\'t a symlink')
# Create the new snapshot directory.
incomplete = path.join(target, 'incomplete-' + options.stamp)
if path.lexists(incomplete):
raise SnapshotError(f'File system object {incomplete} already exists')
if options.verbose:
print(f'Creating snapshot directory {incomplete}')
if not options.dry_run:
os.makedirs(incomplete)
# Copy across the individual source directory trees.
for src in options.sources:
dest = path.join(incomplete, src[1:])
link_dest = path.join(current, src[1:])
if path.lexists(link_dest):
if not path.isdir(link_dest):
raise SnapshotError(f'Link destination {link_dest} exists but is not a directory')
link_dest = ['--link-dest', link_dest]
else:
print(f'Link destination {link_dest} doesn\'t exist')
link_dest = []
if options.verbose:
print(f'Creating destination directory {dest}', file=sys.stderr)
if not options.dry_run:
os.makedirs(dest)
cmd = ['rsync', '-axv', *link_dest, src, dest]
if options.verbose:
print(' '.join(cmd), file=sys.stderr)
if not options.dry_run:
cp = subprocess.run(cmd)
if cp.returncode != 0:
print('Failed command:', ' '.join(cmd), file=sys.stderr)
raise SnapshotError(f'rsync failed with return code {cp.returncode}')
# Make the incomplete snapshot into the new complete snapshot.
backname = 'back-' + options.stamp
complete = path.join(target, backname)
if options.verbose:
print(f'Renaming new incomplete snapshot to {backname}')
if not options.dry_run:
os.rename(incomplete, complete)
# Make it the current backup by replacing the symlink.
if options.verbose:
print(f'Deleting existing symlink {current}', file=sys.stderr)
if not options.dry_run:
os.remove(current)
if options.verbose:
print(f'Creating new symlink {current}', file=sys.stderr)
if not options.dry_run:
os.symlink(backname, current)
# Show free space left on destination.
cmd = ['df', '-h', options.destination]
cp = subprocess.run(cmd)
if cp.returncode != 0:
raise SnapshotError(f'Command {" ".join(cmd)} failed with return code {cp.returncode}')
# Unmount the destination if requested.
if options.unmount:
cmd = ['umount', options.destination]
if options.verbose:
print(' '.join(cmd), file=sys.stderr)
if not options.dry_run:
cp = subprocess.run(cmd)
if cp.returncode != 0:
raise SnapshotError(f'Command {" ".join(cmd)} failed with return code {cp.returncode}')
return 0
if __name__ == '__main__':
try:
options = parse_options(sys.argv[1:])
sys.exit(main(options))
except SnapshotError as e:
print(e, file=sys.stderr)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment