Skip to content

Instantly share code, notes, and snippets.

@vslala
Created June 18, 2025 21:10
Show Gist options
  • Save vslala/171f115dcb1290cb80ae83145bbc7be9 to your computer and use it in GitHub Desktop.
Save vslala/171f115dcb1290cb80ae83145bbc7be9 to your computer and use it in GitHub Desktop.
Resumable multipart upload to S3 Glacier Deep Archive with Python & boto3
import os
import sys
import math
import json
import argparse
import threading
from typing import Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from pydantic import BaseModel, Field
import boto3
from botocore.client import BaseClient
from botocore.exceptions import ClientError
class MultipartUploadInput(BaseModel):
file: str = Field("Local file path to upload")
bucket: str = Field(description="Target S3 bucket")
key: str = Field(description="S3 object key (path in bucket)")
storage_class: str = Field(default="DEEP_ARCHIVE", description="Storage class for the data e.g. DEEP_ARCHIVE, STANDARD etc")
part_size: int = Field(description="Part size in MB")
parallel: bool = Field(default=False, description="Allow parallel upload of parts")
max_workers: int = Field(default=16, description="Maximum parallel threads use to upload")
STATE_FILE: str = ".upload_state.json"
def load_state() -> Dict[str, Any]:
"""
Load upload state from the JSON state file.
Returns an empty dict if the file does not exist.
"""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {}
def save_state(state: Dict[str, Any]) -> None:
"""
Save the upload state to the JSON state file.
"""
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def initiate_upload(s3: BaseClient, bucket: str, key: str, storage_class: str) -> str:
"""
Initiate a multipart upload and return the UploadId.
"""
response = s3.create_multipart_upload(
Bucket=bucket,
Key=key,
StorageClass=storage_class,
ChecksumAlgorithm='SHA256'
)
return response['UploadId']
def upload_part(
file_path: str,
s3: BaseClient,
bucket: str,
key: str,
upload_id: str,
part_number: int,
offset: int,
part_size: int
) -> dict[str, Any]:
"""
Upload a single part and return its ETag.
"""
with open(file_path, 'rb') as f:
f.seek(offset)
data = f.read(part_size)
response = s3.upload_part(
Bucket=bucket,
Key=key,
PartNumber=part_number,
UploadId=upload_id,
Body=data,
ChecksumAlgorithm='SHA256'
)
return {'PartNumber': part_number, 'ETag': response['ETag']}
def list_uploaded_parts(
s3: BaseClient,
bucket: str,
key: str,
upload_id: str
) -> List[Dict[str, Any]]:
"""
List parts already uploaded for a given UploadId.
"""
parts: List[Dict[str, Any]] = []
paginator = s3.get_paginator('list_parts')
for page in paginator.paginate(Bucket=bucket, Key=key, UploadId=upload_id):
for part in page.get('Parts', []):
parts.append({'PartNumber': part['PartNumber'], 'ETag': part['ETag']})
return parts
def complete_upload(
s3: BaseClient,
bucket: str,
key: str,
upload_id: str,
parts: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Complete the multipart upload with all uploaded parts.
Automatically sorts parts by PartNumber.
"""
parts_sorted = sorted(parts, key=lambda x: x['PartNumber'])
return s3.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts_sorted}
)
def abort_upload(
s3: BaseClient,
bucket: str,
key: str,
upload_id: str
) -> Dict[str, Any]:
"""
Abort the multipart upload.
Returns the response from S3.
"""
return s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
def parallel_multipart_upload(input: MultipartUploadInput) -> None:
"""
Perform a resumable multipart upload of a local file to S3 Glacier Deep Archive with parallelism.
:param file_path: Local path to the file to upload.
:param bucket: Target S3 bucket name.
:param key: Key (path) of the object in the bucket.
:param storage_class: S3 storage class, default is 'DEEP_ARCHIVE'.
:param part_size_mb: Size of each part in MB, default is 64.
:param max_workers: Number of parallel upload threads.
"""
file_path = input.file
bucket = input.bucket
key = input.key
storage_class: str = input.storage_class
part_size_mb: int = input.part_size
max_workers: int = input.max_workers
s3: BaseClient = boto3.client('s3')
state: Dict[str, Any] = load_state()
file_size: int = os.path.getsize(file_path)
part_size: int = part_size_mb * 1024 * 1024
total_parts: int = math.ceil(file_size / part_size)
# Resume or start upload
if key in state:
upload_id: str = state[key]['UploadId']
parts_info: List[Dict[str, Any]] = state[key]['parts']
uploaded_parts = {p['PartNumber'] for p in parts_info}
else:
upload_id = initiate_upload(s3, bucket, key, storage_class)
parts_info = []
state[key] = {'UploadId': upload_id, 'parts': parts_info}
uploaded_parts = set()
save_state(state)
lock = threading.Lock()
def upload_task(part_number: int) -> None:
print(f"Uploading part {part_number}/{total_parts}...")
offset = (part_number - 1) * part_size
try:
result = upload_part(file_path, s3, bucket, key, upload_id, part_number, offset, part_size)
with lock:
parts_info.append(result)
state[key]['parts'] = parts_info
save_state(state)
print(f"Part {part_number}/{total_parts} uploaded.")
except Exception as e:
print(f"Error uploading part {part_number}: {e}")
raise
# Submit tasks for missing parts
parts_to_upload = [i for i in range(1, total_parts + 1) if i not in uploaded_parts]
print(f"Uploading {len(parts_to_upload)} parts in parallel using {max_workers} workers...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(upload_task, part) for part in parts_to_upload]
for future in as_completed(futures):
future.result()
# Complete the upload
print("Completing upload...", flush=True)
try:
result = complete_upload(s3, bucket, key, upload_id, parts_info)
print("Upload completed successfully.", result)
del state[key]
save_state(state)
except ClientError as e:
print(f"Error completing upload: {e}")
sys.exit(1)
def multipart_upload(input: MultipartUploadInput) -> None:
"""
Perform a resumable multipart upload of a local file to S3 Glacier Deep Archive.
:param file_path: Local path to the file to upload.
:param bucket: Target S3 bucket name.
:param key: Key (path) of the object in the bucket.
:param storage_class: S3 storage class, default is 'DEEP_ARCHIVE'.
:param part_size_mb: Size of each part in MB, default is 64.
"""
if input.parallel:
parallel_multipart_upload(input)
return
file_path = input.file
bucket = input.bucket
key = input.key
storage_class: str = input.storage_class
part_size_mb: int = input.part_size
s3: BaseClient = boto3.client('s3')
state: Dict[str, Any] = load_state()
file_size: int = os.path.getsize(file_path)
part_size: int = part_size_mb * 1024 * 1024
total_parts: int = math.ceil(file_size / part_size)
# Resume or start upload
if key in state:
upload_id: str = state[key]['UploadId']
uploaded_parts = {p['PartNumber'] for p in state[key].get('parts', [])}
else:
upload_id = initiate_upload(s3, bucket, key, storage_class)
state[key] = {'UploadId': upload_id, 'parts': []}
uploaded_parts = set()
save_state(state)
parts_info: List[Dict[str, Any]] = state[key]['parts']
with open(file_path, 'rb') as f:
for part_number in range(1, total_parts + 1):
if part_number in uploaded_parts:
f.seek(part_size, os.SEEK_CUR)
continue
offset = (part_number - 1) * part_size
print(f"Uploading part {part_number}/{total_parts}...", flush=True)
result = upload_part(file_path, s3, bucket, key, upload_id, part_number, offset, part_size)
parts_info.append(result)
state[key]['parts'] = parts_info
save_state(state)
# Complete the upload
print("Completing upload...", flush=True)
try:
result: Dict[str, Any] = complete_upload(s3, bucket, key, upload_id, parts_info)
print("Upload completed successfully.", result)
# Clean up state
del state[key]
save_state(state)
except ClientError as e:
print(f"Error completing upload: {e}")
sys.exit(1)
def main() -> None:
parser = argparse.ArgumentParser(
description='Multipart upload to S3 Glacier Deep Archive with resume support'
)
parser.add_argument('file', type=str, help='Local file path to upload')
parser.add_argument('bucket', type=str, help='Target S3 bucket')
parser.add_argument('key', type=str, help='S3 object key (path in bucket)')
parser.add_argument('--storage-class', type=str, default='DEEP_ARCHIVE', help='S3 storage class')
parser.add_argument('--part-size', type=int, default=64, help='Part size in MB')
args = parser.parse_args()
multipart_upload(
MultipartUploadInput(
file=args.file,
bucket=args.bucket,
key=args.key,
storage_class=args.storage_class,
part_size=args.part_size
)
)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment