Skip to content

Instantly share code, notes, and snippets.

@Techcable
Last active February 9, 2025 21:47
Show Gist options
  • Save Techcable/b516e3367185059b609aed4341538864 to your computer and use it in GitHub Desktop.
Save Techcable/b516e3367185059b609aed4341538864 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# A nice little script to hash the contents of tarfiles
# Works in parallel
import shutil
import os
import re
import sys
import subprocess
from math import ceil
from threading import Lock
import concurrent.futures
from pathlib import Path
import click
class HashFailedError(RuntimeError):
pass
HASH_OUTPUT_PATTERN = re.compile(r"^(?P<hash>\w+) (?P<entry>.*)$")
def parse_hash_output(text: str, *, checksum_command: str) -> tuple[str, str]:
m = HASH_OUTPUT_PATTERN.fullmatch(text.rstrip())
if m is None:
raise HashFailedError(f"Invalid output from {checksum_command}: {text!r}")
return m.group("hash"), m.group("entry")
class HashContext:
tar_file: Path
hash_format: str
checksum_command: str
output_lock: Lock
def __init__(self, *, tar_file: Path, hash_format: str) -> None:
self.tar_file = tar_file
self.hash_format = hash
self.checksum_command = self.determine_checksum_command(hash_format=hash_format)
self.output_lock = Lock()
def hash_entry(self, entry: str, /):
tar_proc = subprocess.Popen(
[
'bsdtar',
'--to-stdout',
'-xf',
self.tar_file,
'--',
entry,
],
stdout=subprocess.PIPE,
)
hash_proc = subprocess.Popen(
[
self.checksum_command,
],
stdin=tar_proc.stdout,
stdout=subprocess.PIPE,
encoding='utf-8',
)
tar_proc.stdout.close() # docs say we need to do this in case of SIGPIPE
hash_out, _errs = hash_proc.communicate()
if tar_proc.wait() != 0:
raise HashFailedError(f"Failed to read entry: {entry!r}")
if hash_proc.returncode != 0:
raise HashFailedError(f"Failed to run {self.checksum_command} on entry: {entry!r}")
checksum, _entry = parse_hash_output(hash_out, checksum_command=self.checksum_command)
with self.output_lock:
print(checksum + ' ' + entry)
def list_entries(self) -> list[str]:
try:
proc = subprocess.run(["bsdtar", "-tf", self.tar_file], stdout=subprocess.PIPE, check=True, encoding='utf-8')
except subprocess.CalledProcessError:
raise HashFailedError(f"Failed to read entries from {tar_file}")
return [entry for entry in proc.stdout.splitlines() if entry and not entry.endswith("/")]
@classmethod
def determine_checksum_command(cls, hash_format: str) -> str:
checksum_command: str | None = None
potential_commands = []
# macos has `sha1` and `md5` commands which do not follow the standard format.
# in that case try sha1sum and md5sum first
if sys.platform == "darwin" and hash_format in ("md5", "sha1", "sha256", "sha512", "sha224", "sha384"):
potential_commands = [
f"{hash_format}sum",
hash_format,
]
else:
# otherwise, try the command name before adding 'sum' suffix
potential_commands = [
hash_format,
f"{hash_format}sum"
]
for potential_command in (hash_format, f"{hash_format}sum"):
if shutil.which(potential_command):
checksum_command = potential_command
break
if checksum_command is None:
raise HashFailedError(f"Unable to find checksum command for `--hash={hash_format}`")
try:
raw_hash_output = subprocess.run(
[checksum_command, "/dev/null"],
encoding='utf-8',
stdout=subprocess.PIPE,
check=True
).stdout
except subprocess.CalledProcessError:
raise HashFailedError(f"Failed to run checksum command {checksum_command!r}")
actual_hash_output = parse_hash_output(raw_hash_output, checksum_command=checksum_command)
if actual_hash_output[1] != "/dev/null":
raise HashFailedError(f"Expected `/dev/null` for entry, but got {actual_hash_output[1]} ")
return checksum_command
@click.command('hash-tarfile-entries')
@click.option('--hash', '-h', 'hash_format', required=True)
@click.option('--file', '-f', 'tar_file', type=click.Path(exists=True, dir_okay=False, file_okay=True, path_type=Path), required=True)
@click.option('--jobs', '-j', type=click.INT)
@click.argument('target_entries', nargs=-1)
def hash_tarfile_entries(
hash_format: str,
tar_file: Path,
target_entries: list[str],
jobs: int | None,
):
context = HashContext(hash_format=hash_format, tar_file=tar_file)
# Parallelism deosn't make much sense for tarfiles,
# because decompressing from them is inherently sequential.
#
# What we could do is unpack files to temp storage in one thread,
# then compute the hashes in parallel
# zipfiles are a different story because they can be extracted in parallel
if jobs is None:
if '.zip' in tar_file.suffixes:
# use full parallelism for zipfiles
jobs = max(3, ceil(os.cpu_count() * 1.2))
else:
# otherwise assume we're dealing with a tarfile and limit it
jobs = 2
if not target_entries:
target_entries = context.list_entries()
if jobs > 1:
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
for entry in target_entries:
executor.submit(context.hash_entry, entry)
else:
# execute sequentially
for entry in target_entries:
context.hash_entry(entry)
if __name__ == "__main__":
hash_tarfile_entries()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment