Skip to content

Instantly share code, notes, and snippets.

@hsleonis
Created July 21, 2025 06:51
Show Gist options
  • Save hsleonis/5d656e16e2eb43ca4a8027aa835205fd to your computer and use it in GitHub Desktop.
Save hsleonis/5d656e16e2eb43ca4a8027aa835205fd to your computer and use it in GitHub Desktop.
Cross-Cloud Image Inspection Pipeline with Kedro Node + DVC + Airflow Integration
FROM python:3.10-slim
# Install system dependencies for PIL, cloud SDKs, git, DVC, Airflow
RUN apt-get update && apt-get install -y \
build-essential \
git \
libgl1-mesa-glx \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --upgrade pip \
&& pip install dvc[all] \
&& pip install apache-airflow \
&& pip install kedro \
&& pip install -r requirements.txt
# Optionally, install Airflow providers for cloud (if needed)
RUN pip install 'apache-airflow-providers-amazon' \
'apache-airflow-providers-google' \
'apache-airflow-providers-microsoft-azure'
COPY . .
# Entrypoint can be overridden for Kedro, Airflow, etc.
CMD ["python", "pipeline_image_inspection_kedro_dvc_airflow.py"]
"""
📦 Cross-Cloud Image Inspection Pipeline
Kedro Node + DVC + Airflow Integration
Schedules a commonly used number of images per hour through the pipeline.
Supports AWS S3, GCP GCS, Azure Blob
- Metadata extraction
- Quality & corruption checks
- Structure validation
- Low-information filtering
- Auto-cleaning
- DVC logging
- MLflow tracking
- Airflow hourly scheduling
"""
import os
import io
import logging
import pandas as pd
from PIL import Image, UnidentifiedImageError, ImageStat
import imagehash
from tqdm import tqdm
# Cloud SDKs
import boto3
from google.cloud import storage as gcs_storage
from azure.storage.blob import BlobServiceClient
import mlflow
# Kedro node decorator
from kedro.pipeline import node
# For DVC logging
import subprocess
# Airflow imports for DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# Setup
logging.basicConfig(level=logging.INFO)
tqdm.pandas()
# Global config
VALID_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
LOW_ENTROPY_THRESHOLD = 5.0 # configurable
IMAGES_PER_HOUR = 500 # 'commonly used' number
# ---------------------------------------------------------------------------
# ☁️ Cloud Adapters (reuse client objects)
# ---------------------------------------------------------------------------
def aws_s3_client():
return boto3.client('s3')
def list_files_aws(bucket_name, prefix='', s3=None):
s3 = s3 or aws_s3_client()
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
for obj in page.get('Contents', []):
yield obj['Key'], obj['Size']
def read_file_aws(bucket_name, key, s3=None):
s3 = s3 or aws_s3_client()
response = s3.get_object(Bucket=bucket_name, Key=key)
return io.BytesIO(response['Body'].read())
def gcp_client():
return gcs_storage.Client()
def list_files_gcp(bucket_name, prefix='', client=None):
client = client or gcp_client()
bucket = client.bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)
for blob in blobs:
yield blob.name, blob.size
def read_file_gcp(bucket_name, blob_name, client=None):
client = client or gcp_client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
return io.BytesIO(blob.download_as_bytes())
def azure_service(connection_string):
return BlobServiceClient.from_connection_string(connection_string)
def list_files_azure(container, prefix='', connection_string=None, service=None):
service = service or azure_service(connection_string)
container_client = service.get_container_client(container)
blobs = container_client.list_blobs(name_starts_with=prefix)
for blob in blobs:
yield blob.name, blob.size
def read_file_azure(container, blob_name, connection_string=None, service=None):
service = service or azure_service(connection_string)
blob_client = service.get_blob_client(container, blob_name)
return io.BytesIO(blob_client.download_blob().readall())
# ---------------------------------------------------------------------------
# 🧠 Core Image Checks & Cleaning
# ---------------------------------------------------------------------------
def calculate_entropy(img):
stat = ImageStat.Stat(img)
return sum(stat.stddev)
def inspect_image(file_bytes):
base = {
"format": None,
"mode": None,
"width": None,
"height": None,
"hash": None,
"entropy": None,
"low_info": None,
"error": None
}
try:
with Image.open(file_bytes) as img:
img.verify()
file_bytes.seek(0)
with Image.open(file_bytes) as img2:
entropy = calculate_entropy(img2)
base.update({
"format": img2.format,
"mode": img2.mode,
"width": img2.width,
"height": img2.height,
"hash": str(imagehash.average_hash(img2)),
"entropy": entropy,
"low_info": entropy < LOW_ENTROPY_THRESHOLD,
})
except UnidentifiedImageError:
base["error"] = "Unreadable"
except Exception as e:
base["error"] = str(e)
return base
# ---------------------------------------------------------------------------
# 🏭 Pipeline Runner as Kedro Node
# ---------------------------------------------------------------------------
def run_image_inspection_pipeline(
provider,
bucket_name,
prefix='',
max_files=IMAGES_PER_HOUR,
azure_conn_str=None,
dvc_output='results/image_inspection.csv'
):
# Setup cloud clients
aws_client = None
gcp_client_obj = None
azure_service_obj = None
if provider == 'aws':
aws_client = aws_s3_client()
list_fn = lambda b, p: list_files_aws(b, p, s3=aws_client)
read_fn = lambda b, k: read_file_aws(b, k, s3=aws_client)
elif provider == 'gcp':
gcp_client_obj = gcp_client()
list_fn = lambda b, p: list_files_gcp(b, p, client=gcp_client_obj)
read_fn = lambda b, k: read_file_gcp(b, k, client=gcp_client_obj)
elif provider == 'azure':
azure_service_obj = azure_service(azure_conn_str)
list_fn = lambda c, p: list_files_azure(c, p, connection_string=azure_conn_str, service=azure_service_obj)
read_fn = lambda c, k: read_file_azure(c, k, connection_string=azure_conn_str, service=azure_service_obj)
else:
raise ValueError("Unsupported provider")
records = []
mlflow.start_run()
for i, (key, size) in enumerate(tqdm(list_fn(bucket_name, prefix))):
if i >= max_files:
break
ext = os.path.splitext(key)[1].lower()
if ext not in VALID_IMAGE_EXTENSIONS:
continue
try:
file_bytes = read_fn(bucket_name, key)
metadata = inspect_image(file_bytes)
except Exception as e:
metadata = inspect_image(io.BytesIO(b"")) # Empty image for error row
metadata["error"] = str(e)
metadata["filename"] = key
metadata["size_bytes"] = size
records.append(metadata)
df = pd.DataFrame(records)
logging.info(f"✅ Processed {len(df)} image files")
# Ensure columns exist
for col in ["error", "low_info"]:
if col not in df.columns:
df[col] = None
# Auto-cleaning: remove unreadable or low-info
df_clean = df[df["error"].isna() & ~df["low_info"].fillna(False)].copy()
# Save output (handle if output is just a filename)
output_dir = os.path.dirname(dvc_output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
df_clean.to_csv(dvc_output, index=False)
# DVC Add and commit
subprocess.run(["dvc", "add", dvc_output])
subprocess.run(["git", "add", f"{dvc_output}.dvc"])
subprocess.run(["git", "commit", "-m", f"DVC tracked: {dvc_output}"])
# MLflow logging
mlflow.log_param("provider", provider)
mlflow.log_param("bucket_name", bucket_name)
mlflow.log_param("prefix", prefix)
mlflow.log_param("max_files", max_files)
mlflow.log_artifact(dvc_output)
mlflow.log_metric("num_clean_images", len(df_clean))
mlflow.end_run()
return df_clean
# Register as Kedro node
def pipeline_node(params):
return run_image_inspection_pipeline(**params)
kedro_pipeline_node = node(
func=pipeline_node,
inputs="params",
outputs="df_clean",
name="image_inspection_node"
)
# ---------------------------------------------------------------------------
# Airflow DAG for hourly scheduling
# ---------------------------------------------------------------------------
def airflow_run_image_inspection(**kwargs):
params = kwargs['params']
run_image_inspection_pipeline(**params)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2025, 7, 21),
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
with DAG(
dag_id='image_inspection_hourly',
default_args=default_args,
schedule_interval='@hourly', # Hourly schedule
catchup=False,
) as dag:
inspect_images_task = PythonOperator(
task_id='inspect_images',
python_callable=airflow_run_image_inspection,
op_kwargs={
'params': {
'provider': 'aws', # Change as needed
'bucket_name': 'your-bucket',
'prefix': 'images/',
'max_files': IMAGES_PER_HOUR,
'azure_conn_str': None,
'dvc_output': 'results/image_inspection.csv'
}
}
)
pandas
tqdm
Pillow
imagehash
boto3
google-cloud-storage
azure-storage-blob
mlflow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment