Skip to content

Instantly share code, notes, and snippets.

@fgregg
Created September 11, 2025 18:02
Show Gist options
  • Select an option

  • Save fgregg/ff91cb6d8d2c63db519137189ce055cd to your computer and use it in GitHub Desktop.

Select an option

Save fgregg/ff91cb6d8d2c63db519137189ce055cd to your computer and use it in GitHub Desktop.
import tempfile
from pathlib import Path
from urllib.parse import urlparse
import boto3
import click
import environ
import opensearchpy
import pymupdf
import pytesseract
import requests_aws4auth
import tqdm
environ.Env.read_env()
env = environ.Env()
class S3Bucket:
"""Iterable wrapper over an S3 bucket that yields object keys.
Usage:
for key in S3Bucket('my-bucket'):
print(key)
"""
def __init__(self, bucket_name: str):
self.bucket_name = bucket_name
self.client = boto3.client("s3")
def __iter__(self):
paginator = self.client.get_paginator("list_objects_v2")
kwargs = {"Bucket": self.bucket_name}
for page in paginator.paginate(**kwargs):
for obj in page.get("Contents", []):
key = obj["Key"]
assert isinstance(key, str)
yield key
def get(self, key: str) -> dict:
"""Get an object from S3 by key."""
resp = self.client.get_object(Bucket=self.bucket_name, Key=key)
return resp
def __len__(self):
"""Return total number of objects in the bucket/prefix using the paginator."""
paginator = self.client.get_paginator("list_objects_v2")
kwargs = {"Bucket": self.bucket_name}
total = 0
for page in paginator.paginate(**kwargs):
total += len(page.get("Contents", []))
return total
def get_aws_credentials():
session = boto3.Session()
creds = session.get_credentials()
if creds is None:
raise RuntimeError("No AWS credentials found in environment/profile.")
return creds.get_frozen_credentials()
def get_opensearch_client(endpoint: str):
parsed = urlparse(endpoint)
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
use_ssl = parsed.scheme == "https"
creds = get_aws_credentials()
auth = requests_aws4auth.AWS4Auth(
creds.access_key, creds.secret_key, "us-east-1", "es", session_token=creds.token
)
client = opensearchpy.OpenSearch(
hosts=[{"host": host, "port": port}],
http_auth=auth,
use_ssl=use_ssl,
verify_certs=True,
connection_class=opensearchpy.RequestsHttpConnection,
)
client.cluster.health() # test connection
return client
def pdf_to_text(data: bytes) -> str:
# Render pages with PyMuPDF, write PNGs to a temporary dir,
# write an images.txt file listing the image paths (one per line),
# and invoke pytesseract on that list file so tesseract can batch-process.
doc = pymupdf.open(stream=data, filetype="pdf")
with tempfile.TemporaryDirectory() as td_name:
td = Path(td_name)
image_paths = []
for i, page in enumerate(doc): # type: ignore[arg-type]
pix = page.get_pixmap(dpi=300)
img_bytes = pix.tobytes("png")
img_path = td / f"page_{i+1:04d}.png"
img_path.write_bytes(img_bytes)
image_paths.append(img_path)
list_path = td / "images.txt"
# write image paths, one per line
list_path.write_text("\n".join(str(p) for p in image_paths))
# call tesseract via pytesseract on the list file (string path)
text = pytesseract.image_to_string(str(list_path))
return text
def detect_data_type(key: str, resp: dict, data: bytes):
content_type: str = resp["ContentType"]
if (
content_type.lower() == "application/pdf"
or key.lower().endswith(".pdf")
or data.startswith(b"%PDF-")
):
return "application/pdf"
def to_text(key: str, resp: dict) -> str:
"""Consume a get_object response. If PDF, extract text and return it."""
data = resp["Body"].read()
detected_content_type = detect_data_type(key, resp, data)
if detected_content_type == "application/pdf":
return pdf_to_text(data)
else:
raise NotImplementedError(
f"Unhandled content type: {detected_content_type}. Key: {key}"
)
@click.command()
@click.option(
"--endpoint",
type=str,
help="OpenSearch endpoint",
required=True,
)
@click.option(
"--bucket",
type=str,
help="S3 bucket name",
required=True,
)
@click.option(
"--index",
type=str,
help="OpenSearch index name to write documents into",
default="documents",
)
@click.option(
"--update",
is_flag=True,
help="Update existing documents in OpenSearch",
)
def main(endpoint: str, bucket: str, index: str, update: bool):
"""
Fill an OpenSearch index from a S3 bucket.
"""
try:
client = get_opensearch_client(endpoint)
except opensearchpy.exceptions.ConnectionError:
raise click.UsageError(f"Failed to connect to OpenSearch endpoint: {endpoint}")
if not client.indices.exists(index=index):
client.indices.create(
index=index,
body={
"mappings": {
"properties": {
"s3_key": {"type": "keyword", "index": False},
"s3_bucket": {"type": "keyword", "index": False},
"content_type": {"type": "keyword", "index": False},
"text": {"type": "text"},
}
}
},
)
click.echo(f"Created index: {index}", err=True)
objects = S3Bucket(bucket)
for key in tqdm.tqdm(S3Bucket(bucket)):
if not update and client.exists(index=index, id=key):
continue
resp = objects.get(key)
text = to_text(key, resp)
doc = {
"s3_key": key,
"s3_bucket": bucket,
"content_type": resp.get("ContentType"),
"text": text,
}
resp = client.index(index=index, id=key, body=doc)
if __name__ == "__main__":
main()
boto3
requests-aws4auth
django-environ
opensearch-py
click
PyMuPDF
pytesseract
tqdm
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment