Last active
January 25, 2025 21:33
-
-
Save evansd/e4adb3063de4418512b3f5e5d80599ea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Stream the first file out of a ZIP file supplied as a stream of bytes. | |
This is a violation of the ZIP spec in that the canonical contents of a ZIP file are | |
given in the "Central Directory" which only appears at the end of the file. In theory, | |
ZIP files can have any arbitrary junk prepended to them and still be valid. | |
In practice, for the ZIP files we deal with, the file we want to extract is always the | |
very first thing in the file and this allows us to decompress the contents in a | |
streaming fashion as we would with, e.g, a gzipped file. | |
""" | |
import struct | |
import zipfile | |
def file_from_zip_stream(stream): | |
# Below based on the header-reading code from the `zipfile` module: | |
# https://github.com/python/cpython/blob/f2016280/Lib/zipfile/__init__.py#L1649-L1699 | |
# Read and parse the header | |
header_bytes = stream.read(zipfile.sizeFileHeader) | |
header = struct.unpack(zipfile.structFileHeader, header_bytes) | |
if header[zipfile._FH_SIGNATURE] != zipfile.stringFileHeader: | |
raise zipfile.BadZipFile("Bad magic number for file header") | |
# Construct a ZipInfo object based on the contents of the header | |
zinfo = zipfile.ZipInfo() | |
zinfo.compress_type = header[zipfile._FH_COMPRESSION_METHOD] | |
zinfo.compress_size = header[zipfile._FH_COMPRESSED_SIZE] | |
zinfo.file_size = header[zipfile._FH_UNCOMPRESSED_SIZE] | |
zinfo.flag_bits = header[zipfile._FH_GENERAL_PURPOSE_FLAG_BITS] | |
# Check for unsupported features | |
if zinfo.flag_bits & zipfile._MASK_COMPRESSED_PATCH: | |
raise zipfile.BadZipFile("Unsupported feature: compressed patched data") | |
if zinfo.flag_bits & zipfile._MASK_ENCRYPTED: | |
raise zipfile.BadZipFile("Unsupported feature: encryption") | |
# Skip any additional metadata after the header | |
extra_metadata_size = ( | |
header[zipfile._FH_FILENAME_LENGTH] + header[zipfile._FH_EXTRA_FIELD_LENGTH] | |
) | |
stream.read(extra_metadata_size) | |
return zipfile.ZipExtFile(stream, "rb", zinfo) | |
if __name__ == "__main__": | |
import argparse | |
import shutil | |
import sys | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument("--chunk-size", type=int, default=1024 * 1024) | |
args = parser.parse_args() | |
file_stream = file_from_zip_stream(sys.stdin.buffer) | |
shutil.copyfileobj(file_stream, sys.stdout.buffer, length=args.chunk_size) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment