Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Created April 15, 2025 19:49
Show Gist options
  • Save keithchambers/2ae02f900730965a5d7215ba77131db2 to your computer and use it in GitHub Desktop.
Save keithchambers/2ae02f900730965a5d7215ba77131db2 to your computer and use it in GitHub Desktop.
Kafka Batch File Upload System
#!/bin/bash
set -e # Exit on error. Use `set -eux` for debugging to print commands and expand variables.
REPO_NAME="kafka-batch-upload"
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) # Get dir where setup.sh is
KAFKA_IMAGE="bitnami/kafka:3.7" # Define Kafka image once
echo "Setting up the Kafka Batch Upload project in ./${REPO_NAME}"
echo "This will create all necessary files and directories."
echo "Attempting to automate KRaft Cluster ID generation..."
# --- Pre-flight Checks ---
echo "Checking for required commands (docker, sed)..."
if ! command -v docker &> /dev/null; then
echo "Error: 'docker' command not found. Please install Docker Desktop or Docker Engine."
exit 1
fi
if ! command -v sed &> /dev/null; then
echo "Error: 'sed' command not found. This script requires 'sed' to modify configuration."
exit 1
fi
echo "Required commands found."
# --- Directory Structure ---
echo "Creating directory structure..."
mkdir -p "${REPO_NAME}/api/static"
mkdir -p "${REPO_NAME}/consumer"
# --- Generate KRaft Cluster ID ---
echo "Generating Kafka KRaft Cluster ID using Docker..."
# Check if docker is running
if ! docker info > /dev/null 2>&1; then
echo "Error: Docker daemon does not seem to be running. Please start Docker and try again."
exit 1
fi
# Attempt to generate the ID, capture output and exit status
KRAFT_CLUSTER_ID=$(docker run --rm "$KAFKA_IMAGE" kafka-storage.sh random-uuid 2> /dev/null)
EXIT_CODE=$?
if [ $EXIT_CODE -ne 0 ] || [ -z "$KRAFT_CLUSTER_ID" ]; then
echo "Error: Failed to generate KRaft Cluster ID using 'docker run --rm $KAFKA_IMAGE kafka-storage.sh random-uuid'."
echo "Exit Code: $EXIT_CODE"
echo "Please ensure the image '$KAFKA_IMAGE' is accessible, Docker is running and configured correctly, and the command works manually."
exit 1
fi
# Trim potential whitespace/newlines just in case
KRAFT_CLUSTER_ID=$(echo "$KRAFT_CLUSTER_ID" | tr -d '[:space:]')
echo "Successfully generated KRaft Cluster ID: $KRAFT_CLUSTER_ID"
# --- Create Files ---
# --- docker-compose.yml ---
echo "Creating docker-compose.yml (with placeholder ID)..."
cat << 'EOF' > "${REPO_NAME}/docker-compose.yml"
# docker-compose.yml using Kafka 3.7+ with KRaft (Zookeeper-less)
# Reviewed and improved configuration
version: '3.8'
services:
# Zookeeper service is removed
kafka:
# Use a recent Bitnami Kafka image supporting KRaft and arm64
# As of early 2025, 3.7 is stable with KRaft. Update if 4.0 is confirmed stable/available on Bitnami for arm64.
image: bitnami/kafka:3.7
container_name: kafka
ports:
# Port for external clients (from your host machine) to connect to the broker
- "9094:9094"
volumes:
# Persist Kafka data (CRITICAL for KRaft metadata)
- kafka_data:/bitnami/kafka
environment:
# === KRaft Configuration ===
# Enable KRaft mode explicitly
- KAFKA_ENABLE_KRAFT=yes
# Define the roles for this node (combined controller and broker for single-node setup)
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# Set a unique node ID for this Kafka instance
- KAFKA_CFG_NODE_ID=1
# Specify the listener name dedicated to controller communication
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# Define the set of controller nodes (Node ID @ Hostname : Controller Port)
# For single node, it points to itself using the internal service name 'kafka' and controller port 9093
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
# Cluster ID - This will be replaced by setup.sh
- KAFKA_KRAFT_CLUSTER_ID=GENERATED_CLUSTER_ID_REPLACE_ME # <--- *** PLACEHOLDER ***
# === Listener Configuration ===
# Define all listeners the broker and controller will use
# PLAINTEXT: Internal communication between services within Docker (api, consumer)
# CONTROLLER: Internal communication for KRaft metadata synchronization
# EXTERNAL: Communication from outside Docker (your host machine)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
# Define how clients should connect based on the listener they are targeting
# Services inside Docker connect to 'kafka:9092' (PLAINTEXT)
# External clients (e.g., from your Mac) connect to 'localhost:9094' (EXTERNAL)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
# Map listener names to the security protocol used (all PLAINTEXT here)
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
# Specify the listener name used for internal broker-to-broker communication (relevant if >1 broker)
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# === Other Kafka Settings ===
# Replication factor for internal topics (must be 1 in a single-node cluster)
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
# Enable automatic topic creation (convenient for this setup)
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# Bitnami specific helper env var to simplify allowing plaintext
- ALLOW_PLAINTEXT_LISTENER=yes
# Optional: JVM Heap settings (adjust based on available memory)
# - KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
healthcheck:
# Command to check if Kafka is minimally operational by listing topics via internal listener
test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]
interval: 15s
timeout: 10s
retries: 5
# Give KRaft more time to initialize before starting healthchecks
start_period: 30s
clickhouse:
# Using latest is convenient for demo, consider pinning version for stability
image: clickhouse/clickhouse-server:latest
container_name: clickhouse
ports:
- "8123:8123" # HTTP interface
- "9000:9000" # Native TCP interface
ulimits: # Recommended settings for ClickHouse performance
nofile:
soft: 262144
hard: 262144
volumes:
- clickhouse_data:/var/lib/clickhouse
- clickhouse_logs:/var/log/clickhouse-server
healthcheck:
# Basic check for HTTP interface availability
test: ["CMD", "wget", "--spider", "-q", "localhost:8123/ping"]
interval: 10s
timeout: 5s
retries: 3
api:
build: ./api
container_name: api_server
ports:
- "8000:8000"
volumes:
- ./api:/app # Mount code for development (consider removing for 'production' builds)
depends_on:
kafka:
# Wait for Kafka KRaft node to report healthy based on the healthcheck
condition: service_healthy
clickhouse:
condition: service_healthy
environment:
# Ensure API connects to the INTERNAL Kafka listener
KAFKA_BOOTSTRAP_SERVERS: 'kafka:9092'
CLICKHOUSE_HOST: 'clickhouse'
CLICKHOUSE_PORT: 8123 # Port for non-insert operations if needed
EXPECTED_SCHEMA: '{"col_a": "String", "col_b": "Int32", "col_c": "Float64"}'
MAX_FILE_SIZE_BYTES: 1073741824 # 1 GB
PYTHONUNBUFFERED: 1 # Ensure logs are not buffered by Python
LOG_LEVEL: INFO # Control log level
# Use reload for development, switch for production
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
# Production command example (no reload, potentially more workers):
# command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
# Ensure Dockerfile USER instruction is added if running as non-root
consumer:
build: ./consumer
container_name: consumer_worker
depends_on:
kafka:
condition: service_healthy
clickhouse:
condition: service_healthy
environment:
# Ensure Consumer connects to the INTERNAL Kafka listener
KAFKA_BOOTSTRAP_SERVERS: 'kafka:9092'
CLICKHOUSE_HOST: 'clickhouse'
CLICKHOUSE_NATIVE_PORT: 9000 # Use native port for high-performance inserts
CLICKHOUSE_DB: 'default'
CLICKHOUSE_USER: 'default'
CLICKHOUSE_PASSWORD: '' # Add password if configured for ClickHouse
CONSUMER_GROUP_ID: 'batch_processor_group'
EXPECTED_SCHEMA: '{"col_a": "String", "col_b": "Int32", "col_c": "Float64"}'
# Use internal Docker DNS name 'api' for the API service
API_URL: 'http://api:8000'
PYTHONUNBUFFERED: 1 # Ensure logs are not buffered by Python
LOG_LEVEL: INFO # Control log level
# command: ["python", "consumer.py"] # Implicit from Dockerfile CMD
# Ensure Dockerfile USER instruction is added if running as non-root
# Define named volumes for persistent storage
volumes:
kafka_data: # Added volume for Kafka data persistence (essential for KRaft)
clickhouse_data:
clickhouse_logs:
EOF
# --- Replace Placeholder ID in docker-compose.yml ---
echo "Replacing placeholder ID in docker-compose.yml with generated ID: ${KRAFT_CLUSTER_ID}..."
# Use sed for replacement. The -i '' syntax is for macOS compatibility (BSD sed).
# Use a different delimiter like '#' in case the ID contains '/'
# Escape potential special characters in the ID for sed? Unlikely for UUIDs but safer if needed.
sed -i '' "s#GENERATED_CLUSTER_ID_REPLACE_ME#${KRAFT_CLUSTER_ID}#g" "${REPO_NAME}/docker-compose.yml"
echo "Placeholder replaced."
# --- api/requirements.txt ---
echo "Creating api/requirements.txt..."
cat << EOF > "${REPO_NAME}/api/requirements.txt"
fastapi>=0.100.0
uvicorn[standard]>=0.20.0
kafka-python>=2.0.0
pandas>=1.5.0
pyarrow>=10.0.0
python-multipart>=0.0.5
aiofiles>=22.1.0
requests>=2.28.0
EOF
# --- api/Dockerfile ---
echo "Creating api/Dockerfile..."
cat << 'EOF' > "${REPO_NAME}/api/Dockerfile"
# Use official Python base image
FROM python:3.10-slim
# Set working directory
WORKDIR /app
# Create a non-root user and group
RUN groupadd --system appuser && useradd --system --group appuser appuser
# Install build dependencies, install Python packages, then remove build dependencies
COPY requirements.txt .
RUN apt-get update && apt-get install -y --no-install-recommends build-essential && \
pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt && \
apt-get purge -y --auto-remove build-essential && \
rm -rf /var/lib/apt/lists/*
# Copy application code
COPY . .
# Change ownership to non-root user
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Expose the port FastAPI will run on
EXPOSE 8000
# Default command is specified in docker-compose.yml for easier override during development
# CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
EOF
# --- api/main.py ---
echo "Creating api/main.py..."
cat << 'EOF' > "${REPO_NAME}/api/main.py"
import os
import uuid
import time
import json
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
from contextlib import asynccontextmanager
from fastapi import (
FastAPI, UploadFile, File, HTTPException, Request, BackgroundTasks, status
)
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from kafka import KafkaProducer, KafkaAdminClient, KafkaConsumer
from kafka.admin import NewTopic
from kafka.errors import KafkaError, TopicAlreadyExistsError, NoBrokersAvailable
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import io
# --- Constants ---
# Kafka/ClickHouse Config
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9094')
CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST', 'localhost')
CLICKHOUSE_PORT = int(os.getenv('CLICKHOUSE_PORT', 8123))
# File Handling Config
MAX_FILE_SIZE_BYTES = int(os.getenv('MAX_FILE_SIZE_BYTES', 1073741824)) # 1GB Default
CHUNK_SIZE = 1000 # Rows per Kafka message batch (adjust based on row size/perf)
ALLOWED_CONTENT_TYPES = {
'text/csv': 'csv',
'application/vnd.ms-excel': 'csv', # Some systems send CSVs like this
'application/csv': 'csv',
'text/plain': 'csv', # Sometimes used for CSV
'application/vnd.apache.parquet': 'parquet',
'application/parquet': 'parquet',
'application/x-parquet': 'parquet',
'application/octet-stream': 'unknown' # Check extension for octet-stream
}
ALLOWED_EXTENSIONS = {'.csv', '.parquet'}
# Job Status Constants
STATUS_PENDING = "PENDING"
STATUS_PROCESSING = "PROCESSING"
STATUS_PUBLISHED = "PUBLISHED_TO_KAFKA"
STATUS_RUNNING = "RUNNING" # Set by consumer update
STATUS_FAILED = "FAILED"
STATUS_SUCCESS = "SUCCESS" # Set by consumer update
STATUS_PARTIAL_SUCCESS = "PARTIAL_SUCCESS" # Set by consumer update
# Schema Config
EXPECTED_SCHEMA_STR = os.getenv('EXPECTED_SCHEMA', '{"col_a": "String", "col_b": "Int32", "col_c": "Float64"}')
try:
EXPECTED_SCHEMA = json.loads(EXPECTED_SCHEMA_STR)
EXPECTED_COLUMNS = list(EXPECTED_SCHEMA.keys())
except json.JSONDecodeError:
logging.error(f"!!! Invalid EXPECTED_SCHEMA environment variable: {EXPECTED_SCHEMA_STR}")
EXPECTED_SCHEMA = {}
EXPECTED_COLUMNS = []
# --- Logging ---
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- In-memory Job Store (Replace with Redis/DB for production persistence) ---
# WARNING: This data is lost if the API server restarts.
jobs_db: Dict[str, Dict[str, Any]] = {}
# --- Kafka Connection Management ---
# Global variables for Kafka clients to allow reuse
# Consider a more robust connection pooling mechanism for high load
kafka_producer_instance: Optional[KafkaProducer] = None
kafka_admin_client_instance: Optional[KafkaAdminClient] = None
def get_kafka_producer() -> KafkaProducer:
"""Gets the singleton KafkaProducer instance, creating it if necessary."""
global kafka_producer_instance
if kafka_producer_instance is None or kafka_producer_instance.bootstrap_connected() is False:
logger.info(f"Initializing Kafka Producer for {KAFKA_BOOTSTRAP_SERVERS}...")
try:
kafka_producer_instance = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Wait for leader and ISRs
retries=5,
retry_backoff_ms=100, # Wait 100ms initially, increases exponentially
linger_ms=20 # Batch messages for up to 20ms
)
logger.info("Kafka Producer initialized successfully.")
except NoBrokersAvailable:
logger.error(f"!!! Kafka Producer: No brokers available at {KAFKA_BOOTSTRAP_SERVERS}.")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Kafka service unavailable.")
except Exception as e:
logger.exception(f"!!! Error initializing Kafka producer: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to initialize Kafka producer.")
return kafka_producer_instance
def get_kafka_admin_client() -> KafkaAdminClient:
"""Gets the singleton KafkaAdminClient instance, creating it if necessary."""
global kafka_admin_client_instance
# Note: kafka-python AdminClient doesn't have a built-in 'is_connected' check easily available
if kafka_admin_client_instance is None:
logger.info(f"Initializing Kafka Admin Client for {KAFKA_BOOTSTRAP_SERVERS}...")
try:
kafka_admin_client_instance = KafkaAdminClient(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
client_id='batch-upload-api-admin',
request_timeout_ms=10000 # 10 seconds timeout
)
# Test connection by listing topics (can be slow, optional)
# kafka_admin_client_instance.list_topics(timeout_ms=5000)
logger.info("Kafka Admin Client initialized successfully.")
except NoBrokersAvailable:
logger.error(f"!!! Kafka Admin Client: No brokers available at {KAFKA_BOOTSTRAP_SERVERS}.")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Kafka service unavailable.")
except Exception as e:
logger.exception(f"!!! Error initializing Kafka admin client: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to initialize Kafka admin client.")
return kafka_admin_client_instance
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan context manager to manage Kafka client connections."""
logger.info("API Starting up...")
# Initialize clients on startup (optional, can be lazy)
# get_kafka_producer()
# get_kafka_admin_client()
yield
# Clean up connections on shutdown
logger.info("API Shutting down...")
global kafka_producer_instance, kafka_admin_client_instance
if kafka_producer_instance:
try:
kafka_producer_instance.close(timeout=10)
logger.info("Kafka Producer closed.")
except Exception as e:
logger.error(f"Error closing Kafka producer: {e}")
finally:
kafka_producer_instance = None
if kafka_admin_client_instance:
try:
kafka_admin_client_instance.close()
logger.info("Kafka Admin Client closed.")
except Exception as e:
logger.error(f"Error closing Kafka admin client: {e}")
finally:
kafka_admin_client_instance = None
# --- FastAPI App Instance ---
app = FastAPI(
title="Kafka Batch Upload API",
description="API for uploading CSV/Parquet files for batch processing via Kafka.",
version="1.0.0",
lifespan=lifespan
)
# Mount static files (HTML, CSS, JS)
try:
app.mount("/static", StaticFiles(directory="static"), name="static")
except RuntimeError:
logger.warning("Static directory not found or not a directory. UI will not be served.")
# --- Helper Functions ---
def _get_file_type(filename: Optional[str], content_type: Optional[str]) -> Optional[str]:
"""Determines file type (csv or parquet) from content type and filename."""
file_ext = os.path.splitext(filename)[1].lower() if filename else None
normalized_type = ALLOWED_CONTENT_TYPES.get(content_type, 'unknown')
if normalized_type == 'csv' or (normalized_type == 'unknown' and file_ext == '.csv'):
return 'csv'
elif normalized_type == 'parquet' or (normalized_type == 'unknown' and file_ext == '.parquet'):
return 'parquet'
else:
logger.warning(f"Could not determine valid type. Content-Type: {content_type}, Filename: {filename}")
return None
async def _ensure_kafka_topics(job_id: str):
"""Creates the necessary Kafka topics for a job if they don't exist."""
job_topic = f"job_{job_id}"
dlq_topic = f"{job_topic}_dlq"
try:
admin_client = get_kafka_admin_client()
topics = [
NewTopic(name=job_topic, num_partitions=3, replication_factor=1), # Adjust partitions as needed
NewTopic(name=dlq_topic, num_partitions=1, replication_factor=1)
]
# This call attempts creation, ignores if topics exist, raises on other errors.
results = admin_client.create_topics(new_topics=topics, validate_only=False)
# Check results for errors (optional, as exceptions are usually raised)
for topic, future in results.items():
try:
future.get(timeout=5) # Wait for result or timeout
logger.info(f"Job {job_id}: Ensured Kafka topic '{topic}' exists.")
except TopicAlreadyExistsError:
logger.warning(f"Job {job_id}: Topic '{topic}' already exists.")
except Exception as create_err:
# Log error but don't fail if auto-creation is enabled in Kafka broker
logger.error(f"Job {job_id}: Failed to explicitly create Kafka topic '{topic}' (relying on auto-create): {create_err}")
# If explicit creation is mandatory, re-raise or handle differently
# raise HTTPException(status_code=500, detail=f"Failed to create Kafka topic {topic}") from create_err
except Exception as e:
logger.error(f"Job {job_id}: Error ensuring Kafka topics exist: {e}")
# Depending on requirements, might raise HTTP exception or rely on Kafka auto-creation
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to ensure Kafka topics for job {job_id}: {e}"
) from e
async def process_uploaded_file(job_id: str, file: UploadFile, file_type: str):
"""
Reads the uploaded file (CSV or Parquet) in chunks or row groups,
serializes rows as JSON, and publishes them to the job-specific Kafka topic.
NOTE: This implementation currently reads the entire file into memory first
(`await file.read()`). For files significantly larger than available RAM,
a true streaming approach directly from the `UploadFile` object is required,
which adds considerable complexity (e.g., using `aiofiles` with careful
chunking and parsing).
"""
job_topic = f"job_{job_id}"
total_rows_sent = 0
estimated_total_rows = 0
try:
# Ensure Kafka topics exist before processing
await _ensure_kafka_topics(job_id)
_producer = get_kafka_producer() # Get producer instance
logger.info(f"Job {job_id}: Starting file processing. Type: {file_type}")
jobs_db[job_id]["status"] = STATUS_PROCESSING
# --- Read file content (Limitation: reads all into memory) ---
start_read_time = time.monotonic()
try:
file_content = await file.read()
logger.info(f"Job {job_id}: Read {len(file_content)} bytes in {time.monotonic() - start_read_time:.2f} seconds.")
except Exception as read_err:
logger.error(f"Job {job_id}: Failed to read file content: {read_err}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to read uploaded file content.") from read_err
file_like_object = io.BytesIO(file_content)
file_content = None # Attempt to free memory
# --- Process based on file type ---
if file_type == 'csv':
# Estimate total rows (optional, can be slow/inaccurate)
try:
file_like_object.seek(0)
# Using pandas count is more reliable than line counting for complex CSVs
# This still reads the whole file again for counting if not careful
# estimated_total_rows = pd.read_csv(io.BytesIO(file_like_object.getvalue()), low_memory=False).shape[0]
# Simple line count as fallback estimate:
num_lines = sum(1 for _ in io.TextIOWrapper(file_like_object, encoding='utf-8', errors='replace'))
estimated_total_rows = max(0, num_lines - 1 if num_lines > 0 else 0) # Assume header
jobs_db[job_id]["total_rows_estimated"] = estimated_total_rows
file_like_object.seek(0) # Reset for processing
except Exception as count_err:
logger.warning(f"Job {job_id}: Could not estimate rows from CSV: {count_err}")
jobs_db[job_id]["total_rows_estimated"] = -1 # Indicate unknown
# Process in chunks using pandas CSV reader iterator
try:
csv_reader = pd.read_csv(
file_like_object,
chunksize=CHUNK_SIZE,
low_memory=False,
on_bad_lines='warn', # Log bad lines
encoding='utf-8',
encoding_errors='replace' # Replace invalid chars
)
for chunk_df in csv_reader:
chunk_rows = chunk_df.to_dict(orient='records')
for row in chunk_rows:
cleaned_row = {k: v for k, v in row.items() if pd.notna(v)}
# Send message - kafka-python handles batching via linger_ms
_producer.send(job_topic, value={'job_id': job_id, 'row_data': cleaned_row})
total_rows_sent += 1
jobs_db[job_id]["rows_sent_to_kafka"] = total_rows_sent
logger.debug(f"Job {job_id}: Sent chunk ({len(chunk_rows)} rows) to Kafka. Total sent: {total_rows_sent}")
await asyncio.sleep(0.01) # Yield control to event loop
except pd.errors.ParserError as pe:
logger.error(f"Job {job_id}: CSV parsing error: {pe}. Some rows might be skipped.")
# Decide how to handle: fail job or continue? Continue for now.
jobs_db[job_id]["error_message"] = f"CSV parsing error: {pe}"
except Exception as csv_err:
logger.error(f"Job {job_id}: Error processing CSV chunks: {csv_err}")
raise ValueError(f"Error processing CSV: {csv_err}") from csv_err
elif file_type == 'parquet':
try:
pq_file = pq.ParquetFile(file_like_object)
estimated_total_rows = pq_file.metadata.num_rows
jobs_db[job_id]["total_rows_estimated"] = estimated_total_rows
logger.info(f"Job {job_id}: Parquet file has {estimated_total_rows} rows.")
# Iterate through row groups (more memory efficient)
for i in range(pq_file.num_row_groups):
row_group_table: pa.Table = pq_file.read_row_group(i)
# Convert pyarrow table to list of dicts directly
chunk_rows = row_group_table.to_pylist()
for row in chunk_rows:
# Parquet often handles NaNs/None better, but check anyway
cleaned_row = {k: v for k, v in row.items() if v is not None}
_producer.send(job_topic, value={'job_id': job_id, 'row_data': cleaned_row})
total_rows_sent += 1
jobs_db[job_id]["rows_sent_to_kafka"] = total_rows_sent
logger.debug(f"Job {job_id}: Sent row group {i} ({len(chunk_rows)} rows) to Kafka. Total sent: {total_rows_sent}")
await asyncio.sleep(0.01) # Yield control
except Exception as pq_err:
logger.error(f"Job {job_id}: Error reading/processing Parquet file: {pq_err}")
raise ValueError(f"Could not process Parquet file: {pq_err}") from pq_err
# --- Finalize ---
_producer.flush(timeout=30) # Wait up to 30s for messages to be sent
logger.info(f"Job {job_id}: Finished sending {total_rows_sent} rows to Kafka topic {job_topic}.")
# Update status - Consumer is responsible for final SUCCESS/PARTIAL/FAILED status
jobs_db[job_id]["status"] = STATUS_PUBLISHED
jobs_db[job_id]["total_rows_estimated"] = total_rows_sent # Update estimate with actual sent count
except HTTPException:
# Re-raise HTTP exceptions from helpers (e.g., Kafka connection issues)
raise
except Exception as e:
logger.exception(f"Job {job_id}: Unhandled error processing file {file.filename}: {e}")
jobs_db[job_id].update({
"status": STATUS_FAILED,
"end_time": time.time(),
"error_message": f"Internal error during file processing: {str(e)}"
})
finally:
# Ensure file handle is closed if it's still open
if hasattr(file, 'close') and callable(file.close):
await file.close()
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def read_root(request: Request):
"""Serves the main HTML user interface."""
static_file_path = "static/index.html"
if not os.path.exists(static_file_path):
logger.error(f"{static_file_path} not found.")
return HTMLResponse(content="<h1>Error: UI not found</h1>", status_code=500)
try:
with open(static_file_path, "r") as f:
html_content = f.read()
return HTMLResponse(content=html_content)
except Exception as e:
logger.exception(f"Error reading UI file: {e}")
return HTMLResponse(content="<h1>Internal Server Error</h1>", status_code=500)
@app.post(
"/upload",
summary="Upload a CSV or Parquet file for processing",
response_description="The ID of the created processing job",
status_code=status.HTTP_202_ACCEPTED # Indicate background processing
)
async def upload_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(..., description="CSV or Parquet file to upload (max 1GB).")
) -> Dict[str, str]:
"""
Accepts a file upload, validates it, creates a job entry,
and starts background processing to publish rows to Kafka.
"""
# 1. Basic File Checks
if not file.filename:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Filename cannot be empty.")
# 2. Validate File Size (Check header first if available, fallback to reading)
# Note: file.size might be None for streamed uploads until fully read.
# This check might occur *after* reading if size isn't known upfront.
# A more robust check involves middleware or checking Content-Length header,
# but FastAPI's UploadFile handles large files reasonably well internally.
# We perform the check again after reading if size was initially None.
if file.size is not None and file.size > MAX_FILE_SIZE_BYTES:
logger.warning(f"Upload rejected: File '{file.filename}' size {file.size} exceeds limit {MAX_FILE_SIZE_BYTES}")
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File exceeds maximum size of {MAX_FILE_SIZE_BYTES / (1024*1024):.0f} MB"
)
# 3. Validate Content Type and Determine File Type
file_type = _get_file_type(file.filename, file.content_type)
if file_type is None:
logger.warning(f"Upload rejected: Unsupported Content-Type '{file.content_type}' or extension for file '{file.filename}'")
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unsupported file type. Only CSV (.csv) and Parquet (.parquet) files are allowed."
)
logger.info(f"Received upload: {file.filename}, Type: {file.content_type} (determined as {file_type}), Size: {file.size or 'Unknown'}")
# 4. Create Job Entry
job_id = str(uuid.uuid4())
current_time = time.time()
jobs_db[job_id] = {
"job_id": job_id,
"filename": file.filename,
"status": STATUS_PENDING,
"start_time": current_time,
"end_time": None,
"elapsed_time": 0,
"total_rows_estimated": 0, # Will be updated by processing task
"rows_sent_to_kafka": 0,
"processed_count": 0, # Updated by consumer
"success_count": 0, # Updated by consumer
"failed_count": 0, # Updated by consumer
"error_message": None,
"last_update": current_time
}
logger.info(f"Created Job ID: {job_id} for file {file.filename}")
# 5. Start background task for processing
# Pass necessary info, including the determined file_type
background_tasks.add_task(process_uploaded_file, job_id, file, file_type)
return {"job_id": job_id, "message": "File upload accepted. Processing started in background."}
@app.get(
"/jobs",
summary="List all processing jobs",
response_description="A list of job summaries, sorted by start time descending."
)
async def get_jobs() -> List[Dict[str, Any]]:
"""Retrieves a summary of all jobs tracked by the API."""
job_list = []
current_time = time.time()
# Sort by start time descending (newest first)
# Use 0 as default start_time for sorting if somehow missing
sorted_job_ids = sorted(jobs_db.keys(), key=lambda jid: jobs_db.get(jid, {}).get("start_time", 0), reverse=True)
for job_id in sorted_job_ids:
job_data = jobs_db[job_id]
start_time = job_data.get("start_time")
end_time = job_data.get("end_time")
last_update = job_data.get("last_update")
elapsed = (end_time or current_time) - start_time if start_time else 0
job_summary = {
"job_id": job_id,
"filename": job_data.get("filename"),
"status": job_data.get("status"),
"start_time": datetime.fromtimestamp(start_time).isoformat() if start_time else None,
"elapsed_time_seconds": round(elapsed, 2),
"processed_count": job_data.get("processed_count", 0),
"success_count": job_data.get("success_count", 0),
"failed_count": job_data.get("failed_count", 0),
"total_rows_estimated": job_data.get("total_rows_estimated", 0),
"error_message": job_data.get("error_message"),
"last_update": datetime.fromtimestamp(last_update).isoformat() if last_update else None,
}
job_list.append(job_summary)
return job_list
@app.get(
"/jobs/{job_id}/status",
summary="Get the detailed status of a specific job",
response_description="Detailed status information for the requested job."
)
async def get_job_status(job_id: str) -> Dict[str, Any]:
"""Retrieves the current status and statistics for a given job ID."""
job_data = jobs_db.get(job_id)
if not job_data:
logger.warning(f"Status request for unknown job ID: {job_id}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
current_time = time.time()
start_time = job_data.get("start_time")
end_time = job_data.get("end_time")
last_update = job_data.get("last_update")
elapsed = (end_time or current_time) - start_time if start_time else 0
# Create a copy to avoid modifying the original dict directly
status_response = job_data.copy()
status_response["elapsed_time"] = round(elapsed, 2)
# Convert timestamps for JSON response consistency
if start_time:
status_response["start_time_iso"] = datetime.fromtimestamp(start_time).isoformat()
if end_time:
status_response["end_time_iso"] = datetime.fromtimestamp(end_time).isoformat()
if last_update:
status_response["last_update_iso"] = datetime.fromtimestamp(last_update).isoformat()
return status_response
@app.post(
"/jobs/{job_id}/update_status",
summary="Update job status (used internally by consumer)",
status_code=status.HTTP_200_OK,
include_in_schema=False # Hide from public API docs
)
async def update_job_status_from_consumer(job_id: str, status_update: Dict[str, Any]) -> Dict[str, str]:
"""
Internal endpoint for the Kafka consumer to report progress and final status.
Updates the in-memory job store.
"""
if job_id not in jobs_db:
logger.warning(f"Received status update for unknown or potentially expired job: {job_id}")
# Return 404; the consumer should handle this and potentially stop processing for the job.
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found by API")
logger.info(f"Received status update for job {job_id}: {status_update}")
job = jobs_db[job_id]
current_time = time.time()
# Update counts - use get with default to handle missing keys gracefully
job["processed_count"] = status_update.get("processed_count", job["processed_count"])
job["success_count"] = status_update.get("success_count", job["success_count"])
job["failed_count"] = status_update.get("failed_count", job["failed_count"])
job["last_update"] = current_time # Record time of this update
# Check for final status provided by consumer
final_status = status_update.get("final_status")
if final_status and final_status in [STATUS_SUCCESS, STATUS_PARTIAL_SUCCESS, STATUS_FAILED]:
job["status"] = final_status
if not job.get("end_time"): # Set end_time only once
job["end_time"] = current_time
if "error_message" in status_update: # Update error message if provided
job["error_message"] = status_update["error_message"]
logger.info(f"Job {job_id} marked as {final_status} by consumer.")
elif job["status"] not in [STATUS_FAILED, STATUS_SUCCESS, STATUS_PARTIAL_SUCCESS]:
# If no final status, but we got an update, mark as RUNNING if not already finished
job["status"] = STATUS_RUNNING
return {"status": "updated"}
@app.get(
"/jobs/{job_id}/dlq/sample",
summary="Get a sample of rejected rows for a job",
response_description="A list containing up to 10 rejected rows with error details."
)
async def get_dlq_sample(job_id: str) -> List[Dict[str, Any]]:
"""
Retrieves the first 10 messages from the Dead Letter Queue (DLQ)
topic associated with the specified job ID.
"""
if job_id not in jobs_db:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
dlq_topic = f"job_{job_id}_dlq"
rejected_rows: List[Dict[str, Any]] = []
consumer = None
max_sample_size = 10
try:
# Create a temporary consumer to read from the beginning of the DLQ topic
consumer = KafkaConsumer(
dlq_topic,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=f"dlq_reader_{job_id}_{uuid.uuid4()}", # Unique group ID
auto_offset_reset='earliest',
consumer_timeout_ms=3000, # Timeout after 3 seconds if no messages
value_deserializer=lambda v: json.loads(v.decode('utf-8', errors='replace')),
# Security settings if needed
)
logger.info(f"Job {job_id}: Reading DLQ sample from topic '{dlq_topic}'")
for message in consumer:
if isinstance(message.value, dict):
rejected_rows.append(message.value)
else:
logger.warning(f"Job {job_id}: Skipping non-dict message in DLQ sample: {message.value}")
if len(rejected_rows) >= max_sample_size:
break
logger.info(f"Job {job_id}: Read {len(rejected_rows)} rows from DLQ sample.")
except NoBrokersAvailable:
logger.error(f"DLQ Sample: Cannot connect to Kafka at {KAFKA_BOOTSTRAP_SERVERS} for job {job_id}")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Could not connect to Kafka to retrieve DLQ messages.")
except Exception as e:
# Log specific errors like topic not found vs other issues
if "UnknownTopicOrPartitionError" in str(e):
logger.warning(f"Job {job_id}: DLQ topic '{dlq_topic}' not found or empty.")
# Return empty list, not an error, if topic doesn't exist or is empty
return []
elif isinstance(e, json.JSONDecodeError):
logger.error(f"Job {job_id}: JSON decode error reading DLQ sample: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error decoding DLQ message: {e}")
else:
logger.exception(f"Job {job_id}: Error reading DLQ sample: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error reading DLQ sample: {e}")
finally:
if consumer:
try:
consumer.close()
except Exception as ce:
logger.warning(f"Error closing DLQ sample consumer: {ce}")
return rejected_rows
@app.get(
"/jobs/{job_id}/dlq/download",
summary="Download all rejected rows for a job",
response_description="A CSV or Parquet file containing all rows from the job's DLQ.",
response_class=FileResponse # Specify response class for file download
)
async def download_dlq(
job_id: str,
format: str = 'csv' # Query parameter for format
) -> FileResponse:
"""
Downloads all messages from the job's Dead Letter Queue (DLQ) topic,
formats them as either CSV or Parquet, and returns the file.
"""
if job_id not in jobs_db:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
if format not in ['csv', 'parquet']:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid format. Choose 'csv' or 'parquet'.")
dlq_topic = f"job_{job_id}_dlq"
rejected_rows_data = []
consumer = None
try:
# Create a temporary consumer to read the entire DLQ topic
consumer = KafkaConsumer(
dlq_topic,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=f"dlq_downloader_{job_id}_{uuid.uuid4()}", # Unique group ID
auto_offset_reset='earliest',
consumer_timeout_ms=15000, # Longer timeout for potentially large download
value_deserializer=lambda v: json.loads(v.decode('utf-8', errors='replace')),
# Security settings if needed
)
logger.info(f"Job {job_id}: Starting DLQ download from topic '{dlq_topic}' (Format: {format})")
for message in consumer:
# Expected DLQ message format: {"original_row": {...}, "error": "...", "suggestion": "..."}
if isinstance(message.value, dict):
row_info = message.value
# Flatten the structure for DataFrame conversion
flat_row = row_info.get("original_row", {})
if not isinstance(flat_row, dict): # Handle cases where original_row isn't a dict
flat_row = {'malformed_original_row_data': str(flat_row)}
flat_row["error_reason"] = row_info.get("error", "N/A")
flat_row["fix_suggestion"] = row_info.get("suggestion", "N/A")
flat_row["rejected_timestamp_utc"] = row_info.get("rejected_at", "N/A") # Include timestamp if available
rejected_rows_data.append(flat_row)
else:
# Handle messages that aren't valid JSON dicts in the DLQ
rejected_rows_data.append({
'malformed_dlq_message_content': str(message.value),
'error_reason': 'Message in DLQ was not a valid JSON object',
'fix_suggestion': 'Investigate producer sending non-JSON messages to DLQ',
'rejected_timestamp_utc': datetime.now(timezone.utc).isoformat()
})
logger.info(f"Job {job_id}: Read {len(rejected_rows_data)} total rows from DLQ topic '{dlq_topic}'.")
# Create DataFrame (even if empty)
df = pd.DataFrame(rejected_rows_data)
# Prepare file in memory
output_buffer = io.BytesIO()
filename = f"job_{job_id}_rejected_rows.{format}"
media_type = ''
if df.empty:
logger.info(f"Job {job_id}: DLQ is empty, returning empty {format} file.")
# Write header only for CSV, empty Parquet is fine
if format == 'csv':
output_buffer.write(b'empty_dlq\n') # Placeholder content or just empty
media_type = 'text/csv'
elif format == 'parquet':
# Write empty dataframe to parquet
df.to_parquet(output_buffer, index=False, engine='pyarrow')
media_type = 'application/octet-stream'
elif format == 'csv':
df.to_csv(output_buffer, index=False, encoding='utf-8')
media_type = 'text/csv'
elif format == 'parquet':
# Convert complex objects to strings before writing Parquet
for col in df.select_dtypes(include=['object']).columns:
if any(isinstance(x, (list, dict)) for x in df[col] if pd.notna(x)):
try:
# Convert lists/dicts to JSON strings
df[col] = df[col].apply(lambda x: json.dumps(x) if isinstance(x, (list, dict)) else x)
except Exception as json_err:
logger.warning(f"Could not JSON stringify column {col} for Parquet: {json_err}. Falling back to str().")
df[col] = df[col].astype(str) # Fallback
# Ensure remaining objects are strings
try:
df[col] = df[col].astype(str)
except Exception as str_err:
logger.warning(f"Could not convert column {col} to string for Parquet: {str_err}. Applying str() element-wise.")
df[col] = df[col].apply(lambda x: str(x) if pd.notna(x) else None) # Final fallback
df.to_parquet(output_buffer, index=False, engine='pyarrow')
media_type = 'application/octet-stream'
output_buffer.seek(0)
# Use FileResponse to send the file
return FileResponse(
path=output_buffer,
filename=filename,
media_type=media_type,
content_disposition_type="attachment" # Suggest download
)
except NoBrokersAvailable:
logger.error(f"DLQ Download: Cannot connect to Kafka at {KAFKA_BOOTSTRAP_SERVERS} for job {job_id}")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Could not connect to Kafka to retrieve DLQ messages.")
except Exception as e:
if "UnknownTopicOrPartitionError" in str(e):
logger.warning(f"Job {job_id}: DLQ topic '{dlq_topic}' not found or empty for download.")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="DLQ topic not found or is empty.")
else:
logger.exception(f"Job {job_id}: Error processing DLQ download: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error during DLQ download: {e}")
finally:
if consumer:
try:
consumer.close()
except Exception as ce:
logger.warning(f"Error closing DLQ download consumer: {ce}")
@app.get(
"/health",
summary="Perform a health check of the API and its connections",
response_description="Status of the API and Kafka connection.",
tags=["Health"] # Tag for API docs organization
)
async def health_check() -> Dict[str, Any]:
"""Checks the status of the API and its connection to Kafka."""
kafka_status = "unavailable"
try:
# More lightweight check: get metadata for a non-existent topic
# Avoids actually sending messages or relying on topic auto-creation
producer = get_kafka_producer()
producer.partitions_for('__healthcheck_placeholder') # Raises if disconnected
kafka_status = "connected"
except NoBrokersAvailable:
kafka_status = "unavailable (no brokers)"
except Exception as e:
logger.warning(f"Kafka health check failed: {e}")
kafka_status = f"error ({type(e).__name__})"
# Add ClickHouse check if needed
# clickhouse_status = "unknown"
return {"api_status": "ok", "kafka_status": kafka_status}
# Note: Running via Uvicorn command in docker-compose, so no __main__ block needed here.
EOF
# --- api/static/index.html ---
echo "Creating api/static/index.html..."
cat << 'EOF' > "${REPO_NAME}/api/static/index.html"
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Kafka Batch Upload</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<h1>Batch File Upload</h1>
<div class="upload-section">
<form id="upload-form">
<label for="file-input">Select CSV or Parquet file (Max 1GB):</label>
<input type="file" id="file-input" name="file" accept=".csv,.parquet,application/vnd.apache.parquet,application/octet-stream" required>
<button type="submit">Upload</button>
</form>
<div id="upload-status"></div>
</div>
<div class="jobs-section">
<h2>Jobs</h2>
<div class="job-controls">
<label for="sort-status">Filter by Status:</label>
<select id="sort-status">
<option value="all">All</option>
<option value="PENDING">Pending</option>
<option value="PROCESSING">Processing</option>
<option value="PUBLISHED_TO_KAFKA">Published</option>
<option value="RUNNING">Running</option>
<option value="SUCCESS">Success</option>
<option value="PARTIAL_SUCCESS">Partial Success</option>
<option value="FAILED">Failed</option>
</select>
<button id="refresh-jobs">Refresh Jobs</button>
</div>
<table id="jobs-table">
<thead>
<tr>
<th>Job ID</th>
<th>Filename</th>
<th>Status</th>
<th>Progress</th>
<th>Succeeded</th>
<th>Failed</th>
<th>Started</th>
<th>Elapsed (s)</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
</tbody>
</table>
</div>
<div id="job-details-modal" class="modal">
<div class="modal-content">
<span class="close-button">&times;</span>
<h2>Job Details (<span id="modal-job-id"></span>)</h2>
<p><strong>Filename:</strong> <span id="modal-filename"></span></p>
<p><strong>Status:</strong> <span id="modal-status"></span></p>
<div id="modal-error-message" style="color: red; margin-top: 5px; white-space: pre-wrap;"></div>
<h3>Rejected Rows (Sample - First 10)</h3>
<div id="dlq-sample-container">
<table id="dlq-sample-table">
<thead>
</thead>
<tbody>
</tbody>
</table>
</div>
<div id="dlq-message"></div>
<button id="download-dlq-csv" class="dlq-download-btn" style="display: none;">Download All Rejected (CSV)</button>
<button id="download-dlq-parquet" class="dlq-download-btn" style="display: none;">Download All Rejected (Parquet)</button>
</div>
</div>
<script src="/static/script.js"></script>
</body>
</html>
EOF
# --- api/static/style.css ---
echo "Creating api/static/style.css..."
cat << 'EOF' > "${REPO_NAME}/api/static/style.css"
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol";
line-height: 1.6;
margin: 20px;
background-color: #f4f7f9;
color: #333;
}
h1, h2 {
color: #2c3e50;
border-bottom: 2px solid #e0e0e0;
padding-bottom: 8px;
margin-bottom: 20px;
}
h1 {
text-align: center;
}
.upload-section, .jobs-section {
background-color: #ffffff;
padding: 25px;
margin-bottom: 25px;
border-radius: 8px;
box-shadow: 0 4px 12px rgba(0,0,0,0.08);
}
#upload-form label {
display: block;
margin-bottom: 8px;
font-weight: 500;
color: #555;
}
#upload-form input[type="file"] {
margin-bottom: 15px;
display: block;
width: calc(100% - 24px); /* Adjust width */
padding: 10px;
border: 1px solid #ccc;
border-radius: 4px;
background-color: #fdfdfd;
}
#upload-form button, .job-controls button {
padding: 10px 20px;
background-color: #3498db; /* Blue */
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 1em;
transition: background-color 0.2s ease;
}
#upload-form button:hover, .job-controls button:hover {
background-color: #2980b9;
}
#upload-status {
margin-top: 15px;
font-weight: bold;
padding: 10px;
border-radius: 4px;
display: none; /* Hidden initially */
}
#upload-status.success { background-color: #e9f7ef; color: #1f7a4a; display: block; }
#upload-status.error { background-color: #fdeded; color: #c0392b; display: block; }
#upload-status.info { background-color: #eaf2f8; color: #2980b9; display: block; }
.job-controls {
margin-bottom: 20px;
display: flex;
align-items: center;
gap: 15px; /* Spacing between elements */
}
.job-controls label {
font-weight: 500;
margin-bottom: 0; /* Reset margin */
}
.job-controls select {
padding: 8px 12px;
border-radius: 4px;
border: 1px solid #ccc;
background-color: #fff;
}
#jobs-table {
width: 100%;
border-collapse: collapse;
margin-top: 15px;
}
#jobs-table th, #jobs-table td {
border: 1px solid #e0e0e0;
padding: 12px 15px; /* More padding */
text-align: left;
vertical-align: middle; /* Align text vertically */
}
#jobs-table th {
background-color: #f2f5f7;
font-weight: 600; /* Bolder headers */
color: #444;
}
#jobs-table tbody tr:nth-child(even) {
background-color: #f9fafb;
}
#jobs-table tbody tr:hover {
background-color: #f0f4f8;
}
.progress-bar-container {
width: 100%;
min-width: 100px; /* Ensure minimum width */
background-color: #e0e0e0;
border-radius: 5px; /* More rounded */
overflow: hidden;
height: 20px; /* Slightly taller */
position: relative;
}
.progress-bar {
height: 100%;
width: 0%; /* Initial width */
background-color: #2ecc71; /* Green */
text-align: center;
line-height: 20px; /* Match container height */
color: white;
font-size: 12px;
font-weight: bold;
white-space: nowrap;
transition: width 0.4s ease-in-out;
background-image: linear-gradient(45deg, rgba(255, 255, 255, .15) 25%, transparent 25%, transparent 50%, rgba(255, 255, 255, .15) 50%, rgba(255, 255, 255, .15) 75%, transparent 75%, transparent);
background-size: 40px 40px;
animation: progress-bar-stripes 1s linear infinite;
}
.progress-bar.failed {
background-color: #e74c3c; /* Red for failed */
animation: none; /* Stop animation on failure */
}
.progress-bar-text {
position: absolute;
width: 100%;
text-align: center;
top: 0;
left: 0;
line-height: 20px; /* Match container height */
font-size: 11px;
color: #333;
font-weight: bold;
text-shadow: 1px 1px 1px rgba(255,255,255,0.5); /* Slight text shadow */
}
@keyframes progress-bar-stripes {
from { background-position: 40px 0; }
to { background-position: 0 0; }
}
/* Status-specific styling */
.status-cell span {
padding: 3px 8px;
border-radius: 12px; /* Pill shape */
font-size: 0.85em;
font-weight: 500;
white-space: nowrap;
}
.status-PENDING { background-color: #f0f0f0; color: #777; }
.status-PROCESSING, .status-PUBLISHED_TO_KAFKA, .status-RUNNING { background-color: #eaf2f8; color: #3498db; }
.status-SUCCESS { background-color: #e9f7ef; color: #2ecc71; }
.status-PARTIAL_SUCCESS { background-color: #fef5e7; color: #f39c12; }
.status-FAILED { background-color: #fdeded; color: #e74c3c; }
.action-button {
padding: 5px 12px;
font-size: 13px;
cursor: pointer;
margin-right: 5px;
border-radius: 4px;
border: 1px solid #ccc;
background-color: #fff;
transition: background-color 0.2s ease, border-color 0.2s ease;
}
.view-details-btn {
background-color: #f0ad4e; /* Orange */
color: white;
border-color: #eea236;
}
.view-details-btn:hover { background-color: #ec971f; border-color: #d58512; }
/* Modal Styles */
.modal {
display: none; /* Hidden by default */
position: fixed; /* Stay in place */
z-index: 1000; /* Sit on top */
left: 0;
top: 0;
width: 100%; /* Full width */
height: 100%; /* Full height */
overflow: auto; /* Enable scroll if needed */
background-color: rgba(0,0,0,0.5); /* Dim background */
backdrop-filter: blur(3px); /* Optional blur */
}
.modal-content {
background-color: #fefefe;
margin: 8% auto; /* Adjusted margin */
padding: 25px 30px;
border: 1px solid #ddd;
width: 85%; /* Wider */
max-width: 900px; /* Max width */
border-radius: 8px;
box-shadow: 0 5px 15px rgba(0,0,0,0.2);
position: relative;
}
.close-button {
color: #aaa;
position: absolute;
top: 15px;
right: 25px;
font-size: 32px;
font-weight: bold;
line-height: 1;
}
.close-button:hover,
.close-button:focus {
color: #333;
text-decoration: none;
cursor: pointer;
}
#dlq-sample-container {
max-height: 300px; /* Limit height and make scrollable */
overflow-y: auto;
border: 1px solid #e0e0e0;
margin-top: 15px;
border-radius: 4px;
}
#dlq-sample-table {
width: 100%;
border-collapse: collapse;
font-size: 13px;
}
#dlq-sample-table th, #dlq-sample-table td {
border: 1px solid #e0e0e0;
padding: 8px 10px;
text-align: left;
max-width: 250px; /* Limit column width */
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
border-bottom: 1px solid #eee; /* Lighter internal borders */
border-left: none;
border-right: none;
}
#dlq-sample-table th {
background-color: #f8f9fa;
position: sticky; /* Make headers stick */
top: 0; /* Stick to the top of the container */
z-index: 1;
border-top: none;
}
#dlq-sample-table td {
border-top: none;
}
#dlq-sample-table tr:last-child td {
border-bottom: none;
}
#dlq-message { margin-top: 15px; margin-bottom: 15px; color: #555;}
.dlq-download-btn {
margin-top: 15px;
margin-right: 10px;
padding: 9px 15px;
background-color: #3498db;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 0.9em;
transition: background-color 0.2s ease;
}
.dlq-download-btn:hover {
background-color: #2980b9;
}
/* Responsive adjustments (optional) */
@media (max-width: 768px) {
.modal-content {
width: 95%;
margin: 5% auto;
padding: 20px;
}
#jobs-table th, #jobs-table td {
padding: 8px;
font-size: 13px;
}
.job-controls {
flex-direction: column;
align-items: flex-start;
}
}
EOF
# --- api/static/script.js ---
echo "Creating api/static/script.js..."
cat << 'EOF' > "${REPO_NAME}/api/static/script.js"
/**
* Frontend JavaScript for Kafka Batch Upload UI
* Handles file uploads, job status polling, and displaying job details/DLQ.
*/
document.addEventListener('DOMContentLoaded', () => {
// --- DOM Elements ---
const uploadForm = document.getElementById('upload-form');
const fileInput = document.getElementById('file-input');
const uploadStatus = document.getElementById('upload-status');
const jobsTableBody = document.getElementById('jobs-table')?.querySelector('tbody');
const refreshButton = document.getElementById('refresh-jobs');
const sortStatusSelect = document.getElementById('sort-status');
const modal = document.getElementById('job-details-modal');
const modalCloseButton = modal?.querySelector('.close-button');
const modalJobId = document.getElementById('modal-job-id');
const modalFilename = document.getElementById('modal-filename');
const modalStatus = document.getElementById('modal-status');
const modalErrorMessage = document.getElementById('modal-error-message');
const dlqTable = document.getElementById('dlq-sample-table');
const dlqTableHead = dlqTable?.querySelector('thead');
const dlqTableBody = dlqTable?.querySelector('tbody');
const dlqMessage = document.getElementById('dlq-message');
const downloadCsvButton = document.getElementById('download-dlq-csv');
const downloadParquetButton = document.getElementById('download-dlq-parquet');
// --- State ---
let currentJobsData = []; // Store fetched jobs data, sorted by start time desc
let activeJobIdForModal = null; // Store job ID for modal downloads
let jobPollingInterval = null; // Interval ID for polling
const POLLING_INTERVAL_MS = 5000; // Poll every 5 seconds
// --- Utility Functions ---
/** Formats an ISO timestamp string into a locale-specific string. */
const formatTimestamp = (isoString) => {
if (!isoString) return 'N/A';
try {
// Use options for clarity
const options = {
year: 'numeric', month: 'short', day: 'numeric',
hour: '2-digit', minute: '2-digit', second: '2-digit', hour12: true
};
return new Date(isoString).toLocaleString(undefined, options);
} catch (e) {
console.error("Error formatting timestamp:", isoString, e);
return 'Invalid Date';
}
};
/** Escapes HTML special characters in a string to prevent XSS. */
const escapeHtml = (unsafe) => {
if (unsafe === null || typeof unsafe === 'undefined') return '';
// Ensure input is a string before replacing
return String(unsafe)
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&#039;");
};
/** Displays status messages related to file uploads. */
const showUploadStatus = (message, type = 'info') => {
if (!uploadStatus) return;
uploadStatus.textContent = message;
// Use class names for styling success/error/info messages
uploadStatus.className = type; // Removes previous classes
};
// --- Upload Logic ---
/** Handles the file upload form submission. */
const handleUpload = async (event) => {
event.preventDefault(); // Prevent default form submission
if (!fileInput?.files?.length) {
showUploadStatus('Please select a file to upload.', 'error');
return;
}
showUploadStatus('Uploading...', 'info');
const formData = new FormData();
formData.append('file', fileInput.files[0]);
try {
// Make POST request to the /upload endpoint
const response = await fetch('/upload', {
method: 'POST',
body: formData,
});
// Parse the JSON response
const result = await response.json();
// Check if the request was successful
if (!response.ok) {
// Throw an error with details from the API response or a generic message
throw new Error(result.detail || `Upload failed with status: ${response.status}`);
}
// Display success message and job ID
showUploadStatus(`Success! Job ID: ${result.job_id}. Processing started.`, 'success');
if (uploadForm) uploadForm.reset(); // Clear the form input
await fetchJobs(); // Refresh job list immediately
startJobPolling(); // Ensure polling is active
} catch (error) {
console.error('Upload error:', error);
showUploadStatus(`Upload failed: ${error.message}`, 'error');
}
};
// --- Job Fetching and Display ---
/** Fetches the list of jobs from the API. */
const fetchJobs = async () => {
console.log("Fetching jobs...");
try {
const response = await fetch('/jobs');
if (!response.ok) {
throw new Error(`Failed to fetch jobs: ${response.status} ${response.statusText}`);
}
// API should return jobs sorted already, but we store them here
currentJobsData = await response.json();
renderJobs(); // Update the UI table
checkActiveJobs(); // Decide if polling should continue
} catch (error) {
console.error('Error fetching jobs:', error);
if (jobsTableBody) {
// Display error message in the table body
jobsTableBody.innerHTML = `<tr><td colspan="9" style="color: red; text-align: center;">Error loading jobs: ${escapeHtml(error.message)}</td></tr>`;
}
// Consider stopping polling on persistent errors
// stopJobPolling();
}
};
/** Renders the fetched job data into the HTML table. */
const renderJobs = () => {
if (!jobsTableBody || !sortStatusSelect) {
console.error("Jobs table body or sort select not found");
return;
}
const selectedStatus = sortStatusSelect.value;
jobsTableBody.innerHTML = ''; // Clear existing rows
// Filter jobs based on the selected status filter
const filteredJobs = currentJobsData.filter(job =>
selectedStatus === 'all' || job.status === selectedStatus
);
// Display message if no jobs match the filter
if (filteredJobs.length === 0) {
jobsTableBody.innerHTML = `<tr><td colspan="9" style="text-align: center; padding: 20px; color: #777;">No jobs found${selectedStatus !== 'all' ? ' matching status "' + escapeHtml(selectedStatus) + '"' : ''}.</td></tr>`;
return;
}
// Create and append a row for each job
filteredJobs.forEach(job => {
const row = jobsTableBody.insertRow();
row.setAttribute('data-job-id', job.job_id); // Useful for debugging or future features
// --- Calculate Progress ---
const total = job.total_rows_estimated || 0;
const processed = job.processed_count || 0;
let progressPercent = 0;
let progressText = `${processed} / ${total > 0 ? total : '?'}`; // Text for tooltip
let progressBarClass = ''; // Additional class for styling (e.g., 'failed')
// Determine progress percentage and bar style based on job status
if (job.status === 'FAILED') {
progressPercent = 100; // Show full bar but red
progressBarClass = 'failed';
} else if (['SUCCESS', 'PARTIAL_SUCCESS'].includes(job.status)) {
progressPercent = 100;
// Adjust text if total was unknown but processing happened
if (total <= 0 && processed > 0) progressText = `${processed} / ?`;
} else if (total > 0 && processed >= 0) { // Allow processed=0 when total>0
progressPercent = Math.min(100, Math.round((processed / total) * 100));
}
// Note: If status is PENDING/PROCESSING etc. and total=0, percent remains 0
// --- Prepare Cell Content (with escaping) ---
const startTime = formatTimestamp(job.start_time);
const jobIdShort = job.job_id ? escapeHtml(job.job_id.substring(0, 8)) + '...' : 'N/A';
const filename = escapeHtml(job.filename || 'N/A');
const status = escapeHtml(job.status || 'UNKNOWN');
const successCount = job.success_count || 0;
const failedCount = job.failed_count || 0;
const elapsedTime = job.elapsed_time_seconds !== undefined ? job.elapsed_time_seconds.toFixed(1) : 'N/A';
// --- Populate Row ---
row.innerHTML = `
<td title="${escapeHtml(job.job_id)}">${jobIdShort}</td>
<td title="${filename}">${filename}</td>
<td class="status-cell"><span class="status-${status}">${status}</span></td>
<td>
<div class="progress-bar-container" title="${escapeHtml(progressText)}">
<div class="progress-bar ${progressBarClass}" style="width: ${progressPercent}%;"></div>
<div class="progress-bar-text">${progressPercent}%</div>
</div>
</td>
<td>${successCount}</td>
<td>${failedCount}</td>
<td>${startTime}</td>
<td>${elapsedTime}</td>
<td>
<button class="action-button view-details-btn" data-job-id="${job.job_id}">Details</button>
</td>
`;
// --- Add Event Listener for Details Button ---
const detailsButton = row.querySelector('.view-details-btn');
if (detailsButton) {
detailsButton.addEventListener('click', (e) => {
// Get job ID from the button's data attribute
const jobId = e.currentTarget.getAttribute('data-job-id');
if (jobId) showJobDetails(jobId); // Show modal
});
}
});
}
/** Checks if any jobs are still in a non-terminal state and starts polling if needed. */
const checkActiveJobs = () => {
const hasActiveJobs = currentJobsData.some(job =>
!['SUCCESS', 'FAILED', 'PARTIAL_SUCCESS'].includes(job.status)
);
if (hasActiveJobs && !jobPollingInterval) {
startJobPolling();
}
// Optional: Stop polling if no jobs are active anymore
// else if (!hasActiveJobs && jobPollingInterval) {
// console.log("No active jobs detected. Stopping polling.");
// stopJobPolling();
// }
}
/** Starts the periodic polling for job status updates. */
const startJobPolling = () => {
if (jobPollingInterval) return; // Prevent multiple intervals
console.log(`Starting job status polling (every ${POLLING_INTERVAL_MS / 1000} seconds)...`);
fetchJobs(); // Fetch immediately on start
jobPollingInterval = setInterval(fetchJobs, POLLING_INTERVAL_MS);
}
/** Stops the periodic polling. */
const stopJobPolling = () => {
if (jobPollingInterval) {
console.log("Stopping job status polling.");
clearInterval(jobPollingInterval);
jobPollingInterval = null;
}
}
// --- Modal Logic ---
/** Closes the job details modal. */
const closeModal = () => {
if (modal) modal.style.display = 'none';
activeJobIdForModal = null; // Clear the active job ID
};
/** Fetches job details and DLQ sample, then displays the modal. */
const showJobDetails = async (jobId) => {
activeJobIdForModal = jobId; // Store the job ID for potential downloads
console.log(`Showing details for job: ${jobId}`);
const job = currentJobsData.find(j => j.job_id === jobId);
// Ensure all required modal elements are present
if (!job || !modal || !modalJobId || !modalFilename || !modalStatus || !modalErrorMessage || !dlqTableHead || !dlqTableBody || !dlqMessage || !downloadCsvButton || !downloadParquetButton) {
console.error("Cannot show job details: Modal elements or job data not found.");
alert("Error: Could not display job details."); // User feedback
return;
}
// --- Populate Basic Job Info ---
modalJobId.textContent = escapeHtml(jobId.substring(0, 8)) + '...';
modalFilename.textContent = escapeHtml(job.filename || 'N/A');
modalStatus.textContent = escapeHtml(job.status || 'UNKNOWN');
// Display error message if present
modalErrorMessage.textContent = escapeHtml(job.error_message || '');
modalErrorMessage.style.display = job.error_message ? 'block' : 'none';
// --- Reset and Prepare DLQ Section ---
dlqTableHead.innerHTML = ''; // Clear previous headers
dlqTableBody.innerHTML = '<tr><td colspan="1" style="text-align: center; padding: 15px; color: #777;">Loading DLQ sample...</td></tr>'; // Loading indicator
dlqMessage.textContent = ''; // Clear previous messages
dlqMessage.style.color = '#555'; // Reset color
downloadCsvButton.style.display = 'none'; // Hide download buttons initially
downloadParquetButton.style.display = 'none';
// --- Show Modal ---
modal.style.display = 'block';
// --- Fetch and Display DLQ Sample ---
try {
const response = await fetch(`/jobs/${jobId}/dlq/sample`);
if (!response.ok) {
// Handle specific 404 for empty/non-existent DLQ
if (response.status === 404) {
dlqTableBody.innerHTML = ''; // Clear loading message
dlqMessage.textContent = 'No rejected rows found (or DLQ is empty/not yet created).';
return; // Nothing more to display for DLQ
}
// Try to parse error detail from API response
const errorData = await response.json().catch(() => ({ detail: 'Failed to parse error response' }));
throw new Error(errorData.detail || `Failed to fetch DLQ sample: ${response.status}`);
}
const dlqSample = await response.json();
dlqTableBody.innerHTML = ''; // Clear loading message
if (!Array.isArray(dlqSample) || dlqSample.length === 0) {
dlqMessage.textContent = 'No rejected rows found in the sample.';
} else {
dlqMessage.textContent = `Showing first ${dlqSample.length} rejected rows.`;
// Show download buttons only if there are rejected rows
downloadCsvButton.style.display = 'inline-block';
downloadParquetButton.style.display = 'inline-block';
// --- Dynamically Create DLQ Table Headers ---
const headers = new Set();
// Find the first valid object in the sample to determine keys
const firstValidItem = dlqSample.find(item => typeof item === 'object' && item !== null);
if (firstValidItem) {
// Prioritize standard DLQ fields
if ('error' in firstValidItem) headers.add('error_reason');
if ('suggestion' in firstValidItem) headers.add('fix_suggestion');
if ('rejected_at' in firstValidItem) headers.add('rejected_timestamp_utc');
// Add keys from the original row data if it's an object
if (typeof firstValidItem.original_row === 'object' && firstValidItem.original_row !== null) {
Object.keys(firstValidItem.original_row).forEach(key => headers.add(key));
} else if ('original_row' in firstValidItem) {
headers.add('original_row_data'); // Handle non-object original_row
}
} else if (dlqSample.length > 0) {
// Fallback if sample contains only non-objects (e.g., strings)
headers.add('dlq_message_content');
}
// Define preferred order (optional)
const preferredOrder = ['error_reason', 'fix_suggestion', 'rejected_timestamp_utc'];
const headerOrder = [
...preferredOrder.filter(h => headers.has(h)), // Add preferred headers first if they exist
...[...headers].filter(h => !preferredOrder.includes(h)).sort() // Add remaining headers sorted alphabetically
];
// Create header row
const headerRow = dlqTableHead.insertRow();
headerOrder.forEach(key => {
const th = document.createElement('th');
// Make headers more readable
th.textContent = key.replace(/_/g, ' ').replace(/(?:^|\s)\S/g, a => a.toUpperCase());
headerRow.appendChild(th);
});
// --- Populate DLQ Table Rows ---
dlqSample.forEach(item => {
const row = dlqTableBody.insertRow();
headerOrder.forEach(key => {
const cell = row.insertCell();
let value = undefined; // Default to undefined
// Extract value based on the header key
if (typeof item === 'object' && item !== null) {
if (key === 'error_reason') value = item.error;
else if (key === 'fix_suggestion') value = item.suggestion;
else if (key === 'rejected_timestamp_utc') value = item.rejected_at;
else if (key === 'original_row_data' && typeof item.original_row !== 'object') value = item.original_row;
else if (item.original_row && typeof item.original_row === 'object') value = item.original_row[key];
// Handle case where key might be missing in a specific original_row
if (value === undefined && key in item) value = item[key]; // Check top level if not found in original_row
} else if (key === 'dlq_message_content') {
value = item; // Show the non-object item directly
}
// Format value for display
let displayValue = '';
if (typeof value === 'object' && value !== null) {
try { displayValue = JSON.stringify(value); } catch { displayValue = '[Object]'; }
} else if (value === null || typeof value === 'undefined') {
displayValue = ''; // Display null/undefined as empty string
} else {
displayValue = String(value); // Convert other types to string
}
// Set cell content and title (for hover tooltip)
cell.textContent = displayValue;
cell.title = displayValue;
});
});
}
} catch (error) {
console.error(`Error fetching/displaying DLQ sample for ${jobId}:`, error);
dlqTableBody.innerHTML = ''; // Clear loading/previous data
dlqMessage.textContent = `Error loading DLQ sample: ${escapeHtml(error.message)}`;
dlqMessage.style.color = 'red'; // Indicate error
}
}
// --- Event Listeners ---
if (uploadForm) {
uploadForm.addEventListener('submit', handleUpload);
}
if (refreshButton) {
refreshButton.addEventListener('click', fetchJobs);
}
if (sortStatusSelect) {
// Use 'change' event for select elements
sortStatusSelect.addEventListener('change', renderJobs); // Re-render when filter changes
}
// Modal close listeners
if (modalCloseButton) {
modalCloseButton.addEventListener('click', closeModal);
}
if (modal) {
// Close modal if user clicks outside the modal content area
modal.addEventListener('click', (event) => {
if (event.target === modal) {
closeModal();
}
});
}
// DLQ Download button listeners
if (downloadCsvButton) {
downloadCsvButton.addEventListener('click', () => {
if (activeJobIdForModal) {
// Trigger download by navigating to the download URL
window.location.href = `/jobs/${activeJobIdForModal}/dlq/download?format=csv`;
} else {
console.error("Cannot download CSV: No active job ID for modal.");
}
});
}
if (downloadParquetButton) {
downloadParquetButton.addEventListener('click', () => {
if (activeJobIdForModal) {
// Trigger download
window.location.href = `/jobs/${activeJobIdForModal}/dlq/download?format=parquet`;
} else {
console.error("Cannot download Parquet: No active job ID for modal.");
}
});
}
// Cleanup polling interval when the page is unloaded
window.addEventListener('beforeunload', stopJobPolling);
// --- Initial Load ---
startJobPolling(); // Start polling for job statuses when the page loads
});
EOF
# --- consumer/requirements.txt ---
echo "Creating consumer/requirements.txt..."
cat << EOF > "${REPO_NAME}/consumer/requirements.txt"
kafka-python>=2.0.0
pandas>=1.5.0
clickhouse-connect>=0.6.0
requests>=2.28.0
EOF
# --- consumer/Dockerfile ---
echo "Creating consumer/Dockerfile..."
cat << 'EOF' > "${REPO_NAME}/consumer/Dockerfile"
# Use official Python base image
FROM python:3.10-slim
# Set working directory
WORKDIR /app
# Create a non-root user and group
RUN groupadd --system appuser && useradd --system --group appuser appuser
# Install build dependencies, install Python packages, then remove build dependencies
COPY requirements.txt .
RUN apt-get update && apt-get install -y --no-install-recommends build-essential && \
pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt && \
apt-get purge -y --auto-remove build-essential && \
rm -rf /var/lib/apt/lists/*
# Copy application code
COPY . .
# Change ownership to non-root user
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Default command to run the consumer script
CMD ["python", "consumer.py"]
EOF
# --- consumer/consumer.py ---
echo "Creating consumer/consumer.py..."
cat << 'EOF' > "${REPO_NAME}/consumer/consumer.py"
import os
import json
import time
import signal
import sys
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Tuple, List, Optional, Union, Set
import uuid # For UUID validation/conversion
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.errors import NoBrokersAvailable, KafkaError, KafkaTimeoutError
import pandas as pd
import clickhouse_connect
from clickhouse_connect.driver.client import Client as ClickHouseClient # Type hint
from clickhouse_connect.driver import exceptions as ClickHouseExceptions
import requests # To update API
# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9094')
CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST', 'localhost')
CLICKHOUSE_NATIVE_PORT = int(os.getenv('CLICKHOUSE_NATIVE_PORT', 9000))
CLICKHOUSE_DB = os.getenv('CLICKHOUSE_DB', 'default')
CLICKHOUSE_USER = os.getenv('CLICKHOUSE_USER', 'default')
CLICKHOUSE_PASSWORD = os.getenv('CLICKHOUSE_PASSWORD', '')
CONSUMER_GROUP_ID = os.getenv('CONSUMER_GROUP_ID', 'batch_processor_group')
API_URL = os.getenv('API_URL', 'http://localhost:8000') # API Server URL
EXPECTED_SCHEMA_STR = os.getenv('EXPECTED_SCHEMA', '{"col_a": "String", "col_b": "Int32", "col_c": "Float64"}')
TARGET_TABLE = 'models' # ClickHouse table name
JOB_TOPIC_PATTERN = r'^job_.*(?<!_dlq)$' # Regex to match job topics but not DLQ topics
# Timing/Batching Config
STATUS_UPDATE_INTERVAL_SECONDS = 5 # How often to push status updates to API
INSERT_BATCH_SIZE = 1000 # How many rows to buffer before inserting to ClickHouse
INSERT_BATCH_TIMEOUT_SECONDS = 5 # Max time to wait before inserting partial batch
KAFKA_POLL_TIMEOUT_MS = 1000 # Consumer poll timeout
MAX_CONNECTION_RETRIES = 5
RETRY_BACKOFF_SECONDS = 5
# Job Status Constants (should match API)
STATUS_RUNNING = "RUNNING"
STATUS_FAILED = "FAILED"
STATUS_SUCCESS = "SUCCESS"
STATUS_PARTIAL_SUCCESS = "PARTIAL_SUCCESS"
# --- Schema Parsing ---
try:
EXPECTED_SCHEMA = json.loads(EXPECTED_SCHEMA_STR)
EXPECTED_COLUMNS_FROM_FILE = list(EXPECTED_SCHEMA.keys()) # Columns expected in the data row
except json.JSONDecodeError:
logging.error(f"!!! Invalid EXPECTED_SCHEMA environment variable: {EXPECTED_SCHEMA_STR}")
EXPECTED_SCHEMA = {}
EXPECTED_COLUMNS_FROM_FILE = []
# Define the *full* table schema including tracking columns
CLICKHOUSE_TABLE_SCHEMA = {
**EXPECTED_SCHEMA,
'job_uuid': 'UUID',
'ingested_at': 'DateTime64(3, \'UTC\')' # Use DateTime64 for milliseconds, specify UTC
}
# Map pandas/python types to ClickHouse types for validation/conversion
# Add more mappings as needed (e.g., bool -> UInt8)
TYPE_MAPPING = {
'Int8': int, 'Int16': int, 'Int32': int, 'Int64': int,
'UInt8': int, 'UInt16': int, 'UInt32': int, 'UInt64': int,
'Float32': float, 'Float64': float,
'String': str,
'FixedString': str,
'Date': (datetime, pd.Timestamp), 'Date32': (datetime, pd.Timestamp),
'DateTime': (datetime, pd.Timestamp), 'DateTime64': (datetime, pd.Timestamp),
'UUID': uuid.UUID,
'Boolean': bool,
}
def get_python_type(ch_type: str) -> Optional[Union[type, Tuple[type, ...]]]:
"""Maps ClickHouse type string to Python type(s) for validation."""
# Handle Nullable types
is_nullable = ch_type.startswith('Nullable(')
if is_nullable:
ch_type = ch_type[len('Nullable('):-1]
# Handle base types and parameterized types like FixedString, DateTime64
base_ch_type = ch_type.split('(')[0]
py_type = TYPE_MAPPING.get(base_ch_type)
# Allow None if the ClickHouse type was Nullable
# Note: isinstance(None, type) is False, so we can't easily include None here.
# Validation logic needs to handle None separately based on is_nullable.
return py_type
# --- Logging ---
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Global Variables ---
# Use Optional typing for clarity
consumer: Optional[KafkaConsumer] = None
producer: Optional[KafkaProducer] = None # Producer for DLQ
ch_client: Optional[ClickHouseClient] = None
running = True # Flag for graceful shutdown control
# In-memory stats per job for batching updates to API
# Structure: { job_id: {"processed_count": X, "success_count": Y, "failed_count": Z, "last_update_time": T} }
job_stats: Dict[str, Dict[str, Any]] = {}
# --- ClickHouse Connection and Setup ---
def get_clickhouse_client() -> Optional[ClickHouseClient]:
"""Establishes and returns a ClickHouse client connection with retries."""
global ch_client
if ch_client is not None and ch_client.ping(): # Check if existing client is alive
return ch_client
logger.info("Attempting to establish ClickHouse connection...")
retry_delay = RETRY_BACKOFF_SECONDS
for attempt in range(MAX_CONNECTION_RETRIES):
try:
ch_client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_NATIVE_PORT,
database=CLICKHOUSE_DB,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
connect_timeout=10,
send_receive_timeout=60,
settings={
'insert_deduplicate': 0, # Deduplication handled by MergeTree usually
'insert_quorum': 'auto'
}
)
ch_client.ping() # Verify connection is live
logger.info(f"ClickHouse connection successful to {CLICKHOUSE_HOST}:{CLICKHOUSE_NATIVE_PORT}.")
# Ensure table exists after successful connection
create_table_if_not_exists(ch_client)
return ch_client # Return the live client
except ClickHouseExceptions.ClickHouseError as e:
logger.warning(f"ClickHouse connection attempt {attempt+1}/{MAX_CONNECTION_RETRIES} failed: {e}")
if attempt < MAX_CONNECTION_RETRIES - 1:
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60) # Exponential backoff up to 60s
else:
logger.error("Max ClickHouse connection attempts reached. Giving up.")
ch_client = None # Ensure client is None if connection failed
except Exception as e:
logger.exception(f"Unexpected error connecting to ClickHouse: {e}")
ch_client = None
break # Don't retry on unexpected errors
return None # Return None if all attempts failed
def create_table_if_not_exists(client: ClickHouseClient):
"""Creates the target table in ClickHouse using the defined schema if it doesn't exist."""
# Use backticks for identifiers that might clash with keywords or contain special chars
cols_defs = [f"`{col_name}` {col_type}" for col_name, col_type in CLICKHOUSE_TABLE_SCHEMA.items()]
# Define ORDER BY clause - essential for MergeTree performance.
# Choose columns frequently used in WHERE clauses or for sorting.
# Example: Order by job ID and timestamp for time-series analysis per job.
order_by_cols = "(job_uuid, ingested_at)"
# Consider PARTITION BY if data volume is very large (e.g., by month, by job_uuid prefix)
# partition_by_clause = "PARTITION BY toYYYYMM(ingested_at)" # Example partition
create_table_query = f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DB}`.`{TARGET_TABLE}` (
{', '.join(cols_defs)}
) ENGINE = MergeTree()
ORDER BY {order_by_cols}
-- {partition_by_clause} -- Uncomment and adjust if partitioning is needed
SETTINGS index_granularity = 8192;
"""
try:
logger.info(f"Ensuring table `{CLICKHOUSE_DB}`.`{TARGET_TABLE}` exists...")
client.command(create_table_query)
logger.info(f"Table `{CLICKHOUSE_DB}`.`{TARGET_TABLE}` is ready.")
except ClickHouseExceptions.ClickHouseError as e:
logger.error(f"Failed to create or verify table `{TARGET_TABLE}`: {e}")
raise # Propagate error; consumer cannot proceed without the table.
# --- Data Validation and Preparation ---
def validate_and_prepare_row(row_data: Dict[str, Any], job_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[Tuple[str, str]]]:
"""
Validates a row against the EXPECTED_SCHEMA, attempts type conversion, and adds metadata.
Returns:
Tuple[Optional[Dict[str, Any]], Optional[Tuple[str, str]]]:
- (prepared_row, None) if valid. prepared_row contains data ready for ClickHouse insert.
- (None, (error_reason, suggestion)) if invalid.
"""
prepared_row = {}
errors: List[str] = []
missing_cols: List[str] = []
extra_cols: List[str] = []
# 1. Check for missing required columns defined in EXPECTED_SCHEMA
for expected_col in EXPECTED_COLUMNS_FROM_FILE:
if expected_col not in row_data:
missing_cols.append(expected_col)
if missing_cols:
reason = f"Missing required columns: {', '.join(missing_cols)}"
errors.append(reason)
# Continue validation to find other errors, but mark as failed eventually
# 2. Iterate through received data keys, validate types, convert, and check for extras
processed_keys: Set[str] = set()
for key, value in row_data.items():
processed_keys.add(key)
if key in EXPECTED_SCHEMA:
target_ch_type = EXPECTED_SCHEMA[key]
is_nullable = target_ch_type.startswith('Nullable(')
target_py_type = get_python_type(target_ch_type)
# Handle Null values based on Nullable flag
if value is None or value == '': # Treat empty string as None for potential conversion
if is_nullable:
prepared_row[key] = None
continue
else:
# Allow empty string for String types even if not Nullable?
if target_py_type is str and value == '':
prepared_row[key] = ''
continue
else:
errors.append(f"Column '{key}' (type {target_ch_type}) cannot be null or empty.")
continue # Skip further checks for this key
# Check type and attempt conversion if necessary
current_type = type(value)
is_correct_type = isinstance(value, target_py_type) if target_py_type else False
if is_correct_type:
# Type matches, perform any necessary final formatting
try:
if target_py_type is uuid.UUID:
prepared_row[key] = str(value) # CH driver usually takes string UUIDs
elif target_py_type == (datetime, pd.Timestamp):
# Standardize to datetime object, ensure timezone awareness (UTC)
dt_val = pd.to_datetime(value)
prepared_row[key] = dt_val.tz_convert('UTC') if dt_val.tzinfo else dt_val.tz_localize('UTC')
elif target_py_type is bool:
prepared_row[key] = 1 if value else 0 # Convert bool to 1/0 for ClickHouse UInt8
else:
prepared_row[key] = value
except Exception as format_err:
errors.append(f"Column '{key}': Error formatting value '{value}' for ClickHouse type {target_ch_type}. Error: {format_err}")
elif target_py_type:
# Attempt conversion for mismatched types
try:
if target_py_type is int:
prepared_row[key] = int(value)
elif target_py_type is float:
prepared_row[key] = float(value)
elif target_py_type is str:
prepared_row[key] = str(value)
elif target_py_type == (datetime, pd.Timestamp):
# Use pandas for robust date parsing, assume UTC if no timezone
dt_val = pd.to_datetime(value, errors='raise')
prepared_row[key] = dt_val.tz_convert('UTC') if dt_val.tzinfo else dt_val.tz_localize('UTC')
elif target_py_type is uuid.UUID:
prepared_row[key] = str(uuid.UUID(str(value))) # Validate and convert to string
elif target_py_type is bool:
# Handle common string representations of bool
if isinstance(value, str):
val_lower = value.lower()
if val_lower in ['true', '1', 'yes', 't', 'y']: prepared_row[key] = 1
elif val_lower in ['false', '0', 'no', 'f', 'n']: prepared_row[key] = 0
else: raise ValueError("Invalid boolean string")
else: # Try direct bool conversion
prepared_row[key] = 1 if bool(value) else 0
else:
# Type exists but no specific conversion logic, pass through (might fail on insert)
prepared_row[key] = value
logger.warning(f"Job {job_id}: No specific conversion logic for {key} ({current_type}) to {target_ch_type}. Passing value.")
except (ValueError, TypeError, pd.errors.ParserError) as e:
errors.append(f"Column '{key}': Failed to convert value '{value}' (type {current_type.__name__}) to expected type {target_ch_type}. Error: {e}")
else:
# No target python type found - maybe a complex CH type like Array(String)
prepared_row[key] = value # Pass through, might fail later
logger.warning(f"Job {job_id}: Could not map ClickHouse type '{target_ch_type}' for column '{key}' to a standard Python type for validation.")
else:
# Handle unexpected columns present in the data but not in schema
extra_cols.append(key)
# 3. Check if any expected columns were completely missed in the input row
# This catches columns defined in schema but absent from the row_data dict
for expected_col in EXPECTED_COLUMNS_FROM_FILE:
if expected_col not in processed_keys:
# This check is redundant if the first check for missing_cols worked correctly,
# but serves as a safeguard.
if expected_col not in missing_cols: # Avoid duplicate error messages
missing_cols.append(expected_col)
errors.append(f"Missing required column: {expected_col}")
if extra_cols:
logger.debug(f"Job {job_id}: Row contained extra columns not in schema: {', '.join(extra_cols)}")
# 4. Finalize and Return Result
if errors:
# Combine all found errors into a single message
reason = "; ".join(errors)
suggestion = "Review the row data against the expected schema. Check for missing columns, incorrect data types, or values that cannot be converted. Ensure nulls are only provided for Nullable columns."
return None, (reason, suggestion)
else:
# Add tracking metadata if validation passed
try:
prepared_row['job_uuid'] = uuid.UUID(job_id) # Convert job_id string to UUID object
except ValueError:
logger.error(f"Job {job_id}: Invalid Job ID format. Cannot convert to UUID.")
return None, (f"Internal error: Invalid Job ID format '{job_id}'", "Contact support.")
# Use timezone-aware UTC datetime with millisecond precision
prepared_row['ingested_at'] = datetime.now(timezone.utc)
return prepared_row, None
# --- Kafka Connection Setup ---
def get_kafka_consumer() -> Optional[KafkaConsumer]:
"""Initializes and returns the KafkaConsumer instance with retries."""
global consumer
if consumer is not None: # Basic check if already initialized
# Add a check if connection is alive? kafka-python doesn't make this easy.
# Rely on poll() exceptions to detect issues later.
return consumer
logger.info("Attempting to initialize Kafka Consumer...")
retry_delay = RETRY_BACKOFF_SECONDS
for attempt in range(MAX_CONNECTION_RETRIES):
try:
consumer = KafkaConsumer(
# Subscribe later using pattern
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=CONSUMER_GROUP_ID,
value_deserializer=lambda v: json.loads(v.decode('utf-8', errors='replace')),
auto_offset_reset='earliest',
enable_auto_commit=False, # Manual commits
consumer_timeout_ms=-1, # Block indefinitely in poll if no timeout needed
# Use a timeout if periodic actions needed even without messages:
# consumer_timeout_ms=KAFKA_POLL_TIMEOUT_MS,
fetch_max_wait_ms=500,
fetch_min_bytes=1,
fetch_max_bytes=52428800, # 50MB
max_poll_records=INSERT_BATCH_SIZE,
# Add security settings here if needed (e.g., SASL_SSL)
)
# Verify connection by trying to list topics
consumer.topics()
logger.info(f"Kafka Consumer connected to {KAFKA_BOOTSTRAP_SERVERS}. Subscribing to pattern: {JOB_TOPIC_PATTERN}")
consumer.subscribe(pattern=JOB_TOPIC_PATTERN)
return consumer # Success
except NoBrokersAvailable:
logger.warning(f"Kafka Consumer connection attempt {attempt+1}/{MAX_CONNECTION_RETRIES} failed: No brokers available.")
except Exception as e:
logger.error(f"Error initializing Kafka consumer (Attempt {attempt+1}/{MAX_CONNECTION_RETRIES}): {e}")
# Don't retry on unexpected init errors? Or retry anyway? Retry for now.
if attempt < MAX_CONNECTION_RETRIES - 1:
logger.info(f"Retrying Kafka Consumer connection in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
else:
logger.error("Max Kafka Consumer connection attempts reached. Giving up.")
consumer = None # Ensure it's None if failed
return None
def get_kafka_producer() -> Optional[KafkaProducer]:
"""Initializes and returns the KafkaProducer instance (for DLQ) with retries."""
global producer
if producer is not None and producer.bootstrap_connected():
return producer
logger.info("Attempting to initialize Kafka Producer (for DLQ)...")
retry_delay = RETRY_BACKOFF_SECONDS
for attempt in range(MAX_CONNECTION_RETRIES):
try:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Ensure DLQ messages are durable
retries=3,
retry_backoff_ms=100,
# Add security settings if needed
)
# Verify connection by checking metadata for a dummy topic
producer.partitions_for('__dlq_producer_healthcheck')
logger.info(f"Kafka Producer (for DLQ) connected to {KAFKA_BOOTSTRAP_SERVERS}.")
return producer # Success
except NoBrokersAvailable:
logger.warning(f"Kafka DLQ Producer connection attempt {attempt+1}/{MAX_CONNECTION_RETRIES} failed: No brokers available.")
except KafkaTimeoutError:
logger.warning(f"Kafka DLQ Producer connection attempt {attempt+1}/{MAX_CONNECTION_RETRIES} timed out.")
except Exception as e:
logger.error(f"Error initializing Kafka DLQ producer (Attempt {attempt+1}/{MAX_CONNECTION_RETRIES}): {e}")
# Don't retry on unexpected init errors? Retry for now.
if attempt < MAX_CONNECTION_RETRIES - 1:
logger.info(f"Retrying Kafka DLQ Producer connection in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
else:
logger.error("Max Kafka DLQ Producer connection attempts reached. Giving up.")
producer = None # Ensure it's None if failed
return None
# --- Job Status Update ---
def update_job_status_api(job_id: str, processed_delta: int, success_delta: int, failed_delta: int, force_update: bool = False, final_status: Optional[str] = None, error_message: Optional[str] = None):
"""
Aggregates stats locally and sends status update to the API server
periodically (based on STATUS_UPDATE_INTERVAL_SECONDS) or when forced
(e.g., on final status). Includes basic retry logic for the HTTP request.
"""
global job_stats
if not job_id or job_id == 'unknown_job': # Avoid updating status for unknown jobs
logger.debug(f"Skipping status update for invalid job_id: {job_id}")
return
# Initialize stats for the job if it's the first time we see it
if job_id not in job_stats:
job_stats[job_id] = {"processed_count": 0, "success_count": 0, "failed_count": 0, "last_update_time": 0}
# Update local aggregates
stats = job_stats[job_id]
stats["processed_count"] += processed_delta
stats["success_count"] += success_delta
stats["failed_count"] += failed_delta
current_time = time.time()
time_since_last_update = current_time - stats.get("last_update_time", 0)
# Determine if an update should be sent to the API
should_update = force_update or final_status or (time_since_last_update >= STATUS_UPDATE_INTERVAL_SECONDS)
if should_update:
# Prepare payload with current cumulative counts
payload = {
"processed_count": stats["processed_count"],
"success_count": stats["success_count"],
"failed_count": stats["failed_count"],
}
if final_status:
payload["final_status"] = final_status
if error_message:
payload["error_message"] = error_message
update_url = f"{API_URL}/jobs/{job_id}/update_status"
logger.debug(f"Attempting to send status update for job {job_id} to {update_url}: {payload}")
# Attempt to send update with retry
max_api_retries = 3
api_retry_delay = 1
for attempt in range(max_api_retries):
try:
# Use a session for potential connection reuse and timeouts
with requests.Session() as session:
response = session.post(update_url, json=payload, timeout=(5, 15)) # (connect_timeout, read_timeout)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
logger.info(f"Job {job_id}: Successfully sent status update to API. Current totals: Processed={payload['processed_count']}, Success={payload['success_count']}, Failed={payload['failed_count']}{', Final Status: ' + final_status if final_status else ''}")
stats["last_update_time"] = current_time # Update time only on successful send
return # Exit retry loop on success
except requests.exceptions.HTTPError as http_err:
# Handle specific HTTP errors, e.g., 404 means job is gone from API
if http_err.response.status_code == 404:
logger.warning(f"Job {job_id}: API returned 404 (Job Not Found). Stopping updates for this job.")
# Optionally remove job from local stats to prevent further attempts
if job_id in job_stats: del job_stats[job_id]
return # Stop retrying if job doesn't exist on API
else:
logger.error(f"Job {job_id}: HTTP error sending status update (Attempt {attempt+1}/{max_api_retries}): {http_err}")
except requests.exceptions.RequestException as req_err:
# Handle connection errors, timeouts, etc.
logger.error(f"Job {job_id}: Network error sending status update (Attempt {attempt+1}/{max_api_retries}): {req_err}")
except Exception as e:
# Catch any other unexpected errors during the request
logger.exception(f"Job {job_id}: Unexpected error during API status update (Attempt {attempt+1}/{max_api_retries}): {e}")
# If error occurred and more retries left, wait and try again
if attempt < max_api_retries - 1:
logger.info(f"Retrying API status update for job {job_id} in {api_retry_delay} seconds...")
time.sleep(api_retry_delay)
api_retry_delay *= 2 # Exponential backoff
else:
logger.error(f"Job {job_id}: Max retries reached for API status update. Update may be lost.")
# --- Kafka Message Processing ---
def process_kafka_message(message: Any, current_offsets: Dict[TopicPartition, int]) -> Tuple[int, int, int, Optional[str], Optional[Dict[str, Any]]]:
"""
Processes a single Kafka message: validates, prepares for ClickHouse or DLQ.
Returns:
Tuple containing: (processed_delta, success_delta, failed_delta, job_id, prepared_row_or_none)
"""
processed_delta = 1 # Count every message attempt
success_delta = 0
failed_delta = 0
job_id = None
prepared_row = None
tp = TopicPartition(message.topic, message.partition)
try:
# 1. Basic Message Structure Validation
if not isinstance(message.value, dict) or 'job_id' not in message.value or 'row_data' not in message.value:
logger.warning(f"Skipping malformed message structure (offset {message.offset}, topic {tp.topic}): {type(message.value)}")
job_id_malformed = message.value.get('job_id', 'unknown_job') if isinstance(message.value, dict) else 'unknown_job'
send_to_dlq(job_id_malformed, message.value, "Message is not a valid JSON object with 'job_id' and 'row_data'", "Ensure producer sends correct JSON format.", get_kafka_producer())
failed_delta = 1
job_id = job_id_malformed # Use extracted ID if possible for stats
else:
# 2. Extract Data and Validate Row Content
job_id = message.value.get('job_id')
original_row = message.value.get('row_data')
if not isinstance(original_row, dict):
logger.warning(f"Job {job_id}: row_data is not a dict (offset {message.offset}): {type(original_row)}")
send_to_dlq(job_id, original_row, "Row data is not a valid JSON object.", "Ensure 'row_data' field contains a JSON object.", get_kafka_producer())
failed_delta = 1
else:
# Validate row against schema and prepare for insert
prepared_row_or_none, error_info = validate_and_prepare_row(original_row, job_id)
if prepared_row_or_none and error_info is None:
# Row is valid
prepared_row = prepared_row_or_none
success_delta = 1
else:
# Row is invalid, send to DLQ
reason, suggestion = error_info if error_info else ("Unknown validation error", "Review data format.")
send_to_dlq(job_id, original_row, reason, suggestion, get_kafka_producer())
failed_delta = 1
# 3. Track Offset: Always track offset after attempting to process the message
# This ensures we don't re-process malformed or invalid messages on restart.
current_offsets[tp] = message.offset + 1
except Exception as e:
# Catch-all for unexpected errors during individual message processing
logger.exception(f"!!! Job {job_id or 'unknown'}: Unexpected error processing message (offset {message.offset}, topic {tp.topic}): {e}. Sending to DLQ.")
failed_delta = 1
success_delta = 0 # Ensure success is 0 if error occurred
prepared_row = None # Ensure no row is added to insert batch
try:
# Attempt to send the raw message value to DLQ on processing error
send_to_dlq(job_id or 'unknown', message.value, f"Internal consumer error: {e}", "Review consumer logs.", get_kafka_producer())
except Exception as dlq_e:
logger.error(f"!!! Job {job_id or 'unknown'}: Failed to send error message to DLQ after processing failure: {dlq_e}")
# Ensure offset is tracked even after failure
current_offsets[tp] = message.offset + 1
return processed_delta, success_delta, failed_delta, job_id, prepared_row
# --- Main Processing Loop ---
def main_loop():
"""The main consumer loop: polls Kafka, processes messages, inserts batches, commits offsets."""
global running
_consumer = get_kafka_consumer()
_producer = get_kafka_producer() # Needed for DLQ sends within process_kafka_message
_ch_client = get_clickhouse_client()
if not _consumer or not _producer or not _ch_client:
logger.error("!!! Essential components failed to initialize. Consumer cannot start.")
running = False
return
# --- Loop State ---
# Stores {TopicPartition: offset_to_commit}
current_offsets: Dict[TopicPartition, int] = {}
# Buffer for rows ready for ClickHouse insertion
rows_to_insert: List[Dict[str, Any]] = []
last_insert_time = time.time()
last_status_update_check_time = time.time()
logger.info("Starting main message consumption loop...")
while running:
try:
# --- Poll Kafka for Messages ---
# Poll returns dict: {TopicPartition: [messages]} or empty dict on timeout
msg_pack = _consumer.poll(timeout_ms=KAFKA_POLL_TIMEOUT_MS)
if not msg_pack:
# --- Handle Idle Time (No Messages Received) ---
# 1. Check if time to force insert remaining batch
if rows_to_insert and (time.time() - last_insert_time >= INSERT_BATCH_TIMEOUT_SECONDS):
logger.info(f"Insert timeout reached ({INSERT_BATCH_TIMEOUT_SECONDS}s), inserting remaining {len(rows_to_insert)} rows.")
if insert_batch_to_clickhouse(rows_to_insert, _ch_client):
commit_offsets(_consumer, current_offsets) # Commit accumulated offsets
rows_to_insert.clear()
current_offsets.clear() # Clear offsets only after successful commit
else:
logger.error("!!! Batch insert failed after timeout. Offsets NOT committed. Data may be reprocessed.")
# Consider strategy: stop consumer, retry insert, or skip commit? Skipping commit leads to reprocessing.
last_insert_time = time.time() # Reset timer even on failure to avoid tight loop
# 2. Check if time to push periodic status updates for active jobs
if time.time() - last_status_update_check_time >= STATUS_UPDATE_INTERVAL_SECONDS:
for job_id in list(job_stats.keys()): # Iterate over copy of keys
if time.time() - job_stats[job_id].get("last_update_time", 0) >= STATUS_UPDATE_INTERVAL_SECONDS:
# Force update sends current aggregated totals
update_job_status_api(job_id, 0, 0, 0, force_update=True)
last_status_update_check_time = time.time()
continue # Go back to polling
# --- Process Received Message Batch ---
batch_total_processed = 0
batch_total_success = 0
batch_total_failed = 0
batch_job_ids: Set[str] = set() # Track unique job IDs in this batch
for tp, messages in msg_pack.items():
logger.debug(f"Processing {len(messages)} messages from {tp.topic} partition {tp.partition}")
for message in messages:
if not running: break # Check shutdown signal frequently
# Process each message individually
processed, success, failed, job_id, prepared_row = process_kafka_message(message, current_offsets)
# Aggregate batch stats
batch_total_processed += processed
batch_total_success += success
batch_total_failed += failed
if job_id and job_id != 'unknown_job':
batch_job_ids.add(job_id)
# Add valid rows to the insert batch
if prepared_row:
rows_to_insert.append(prepared_row)
if not running: break # Exit outer loop if shutdown signal received
# --- Update API Status for Jobs in Batch ---
# Update status aggregates locally *after* processing the whole poll() batch
for job_id in batch_job_ids:
update_job_status_api(job_id, batch_total_processed, batch_total_success, batch_total_failed)
# Reset batch counters if updating per job, or update based on per-job counts if needed.
# Current logic aggregates all counts from the poll batch and sends updates for involved jobs.
# --- Check if Insert Batch is Full ---
if len(rows_to_insert) >= INSERT_BATCH_SIZE:
logger.info(f"Insert batch size ({INSERT_BATCH_SIZE}) reached, inserting {len(rows_to_insert)} rows.")
if insert_batch_to_clickhouse(rows_to_insert, _ch_client):
commit_offsets(_consumer, current_offsets) # Commit processed offsets
rows_to_insert.clear()
current_offsets.clear() # Clear offsets after successful commit
else:
logger.error(f"!!! Batch insert failed. Offsets NOT committed. Data may be reprocessed.")
# Strategy needed: stop, retry, skip commit?
last_insert_time = time.time() # Reset timer
except KafkaTimeoutError:
logger.warning("Kafka poll timed out (this is expected during idle periods).")
# This allows the idle-time logic (batch insert timeout, status updates) to run.
continue
except Exception as e:
logger.exception(f"!!! FATAL: Unhandled exception in main consumption loop: {e}")
running = False # Signal shutdown on unexpected fatal errors
# --- End of Main Loop (Shutdown Triggered) ---
logger.info("Exiting message consumption loop due to shutdown signal or fatal error.")
# --- Final Insert and Commit ---
if rows_to_insert:
logger.info(f"Performing final insert of {len(rows_to_insert)} rows before shutdown.")
if insert_batch_to_clickhouse(rows_to_insert, _ch_client):
# Commit remaining offsets only if final insert succeeded
commit_offsets(_consumer, current_offsets)
else:
logger.error("!!! Final batch insert failed during shutdown. Offsets NOT committed. Some data may be lost or reprocessed.")
# --- Final Status Updates ---
logger.info("Sending final status updates for any remaining jobs...")
for job_id in list(job_stats.keys()):
# Force update sends the last known aggregated counts for each job
update_job_status_api(job_id, 0, 0, 0, force_update=True)
# --- ClickHouse Interaction ---
def insert_batch_to_clickhouse(rows: List[Dict[str, Any]], client: ClickHouseClient) -> bool:
"""Inserts a batch of processed rows into ClickHouse. Returns True on success, False on failure."""
if not rows:
logger.debug("No rows to insert into ClickHouse.")
return True # Nothing to do is considered success
if not client or not client.ping():
logger.error("ClickHouse client is not available or connection lost. Cannot insert batch.")
# Attempt to reconnect?
client = get_clickhouse_client()
if not client:
logger.error("Reconnection to ClickHouse failed. Insert batch lost.")
return False
logger.info(f"Attempting to insert batch of {len(rows)} rows into {TARGET_TABLE}...")
try:
# Convert list of dicts to DataFrame
# Using from_records can be slightly faster than DataFrame(rows)
df = pd.DataFrame.from_records(rows)
# Ensure DataFrame columns match ClickHouse table schema order exactly
column_order = list(CLICKHOUSE_TABLE_SCHEMA.keys())
# Reindex, adding missing columns as None (shouldn't happen if validation is correct)
# and dropping extra columns (shouldn't happen).
df = df.reindex(columns=column_order)
# --- Type Conversion/Preparation just before insert ---
# Ensure datetimes are timezone-aware UTC as expected by DateTime64(3, 'UTC')
# clickhouse-connect generally handles this well if pandas Timestamps are tz-aware.
for col, ch_type in CLICKHOUSE_TABLE_SCHEMA.items():
if 'DateTime' in ch_type and col in df.columns:
try:
# Ensure Series is datetime type and timezone-aware (UTC)
df[col] = pd.to_datetime(df[col], errors='coerce').dt.tz_convert('UTC')
except Exception as dt_err:
logger.error(f"Error converting column '{col}' to timezone-aware datetime: {dt_err}. Insert might fail.")
# Handle NaT (Not a Time) values resulting from coerce errors if necessary
# df[col] = df[col].fillna(pd.NA) # Or some other placeholder if CH requires non-null
# --- Perform Insert ---
# `insert_df` is generally efficient. Column names in df must match table.
client.insert_df(f'`{CLICKHOUSE_DB}`.`{TARGET_TABLE}`', df, settings={'use_numpy': True})
logger.info(f"Successfully inserted batch of {len(rows)} rows.")
return True
except ClickHouseExceptions.ClickHouseError as e:
logger.error(f"!!! ClickHouse Insert Error: {e}")
# Log details about the failed batch if possible (be careful with large data)
# logger.error(f"Failed batch data sample (first row): {rows[0] if rows else 'N/A'}")
return False
except Exception as e:
# Catch other potential errors (e.g., pandas issues)
logger.exception(f"!!! Unexpected error during ClickHouse batch insert preparation or execution: {e}")
return False
# --- DLQ Interaction ---
def send_to_dlq(job_id: str, original_row: Any, reason: str, suggestion: str, producer_instance: Optional[KafkaProducer]):
"""Sends a rejected row message to the job-specific DLQ topic."""
if not producer_instance:
logger.error(f"Job {job_id}: DLQ Producer is not available. Cannot send error message: {reason}")
return # Cannot send to DLQ
# Sanitize job_id if it came from a malformed message
safe_job_id = job_id if job_id and job_id != 'unknown_job' else 'unknown_job_id'
dlq_topic = f"job_{safe_job_id}_dlq"
# Construct the DLQ message payload
dlq_message = {
"job_id": safe_job_id,
"original_row": original_row, # Include the original data as received
"error": reason,
"suggestion": suggestion,
"rejected_at": datetime.now(timezone.utc).isoformat() # Timestamp of rejection
}
try:
# Asynchronously send the message
future = producer_instance.send(dlq_topic, value=dlq_message)
# Optional: Add callbacks for detailed logging of send success/failure
# future.add_callback(lambda record_metadata: logger.debug(f"DLQ message sent: {record_metadata}"))
# future.add_errback(lambda exc: logger.error(f"Failed to send DLQ message for job {safe_job_id}: {exc}"))
logger.debug(f"Job {safe_job_id}: Sent message to DLQ topic '{dlq_topic}'. Reason: {reason}")
# Rely on linger_ms/batching, or call flush() periodically/on shutdown if needed
except KafkaError as e:
# Errors during the send call itself (e.g., serialization, buffer full)
logger.error(f"Job {safe_job_id}: KafkaError sending message to DLQ topic '{dlq_topic}': {e}")
except Exception as e:
# Catch other potential errors like serialization issues if value_serializer fails
logger.exception(f"Job {safe_job_id}: Unexpected error sending to DLQ topic '{dlq_topic}': {e}")
# Log the problematic message content if possible and safe
try:
# Be cautious logging potentially large/sensitive data
log_msg_sample = str(dlq_message)[:500] # Log a sample
logger.error(f"DLQ Message content sample: {log_msg_sample}")
except Exception:
logger.error("Could not serialize DLQ message content for logging.")
# --- Offset Committing ---
def commit_offsets(consumer_instance: KafkaConsumer, offsets_to_commit: Dict[TopicPartition, int]):
"""Manually commits the provided offsets to Kafka."""
if not offsets_to_commit:
logger.debug("No offsets to commit.")
return
try:
# The offsets dict should contain {TopicPartition: offset_of_next_message_to_read}
logger.info(f"Committing offsets: { {f'{tp.topic}-{tp.partition}': off for tp, off in offsets_to_commit.items()} }")
consumer_instance.commit(offsets=offsets_to_commit)
logger.debug("Offsets committed successfully.")
except Exception as e:
# This is critical. Failure means potential reprocessing on restart.
logger.exception(f"!!! CRITICAL: Failed to commit offsets: {e}. Messages may be reprocessed.")
# Consider alerting or specific handling for commit failures.
# --- Shutdown Handling ---
def shutdown_handler(signum, frame):
"""Sets the global 'running' flag to False on receiving SIGINT or SIGTERM."""
global running
if running: # Prevent multiple calls if signal received repeatedly
logger.info(f"Received signal {signal.Signals(signum).name}. Initiating graceful shutdown...")
running = False
else:
logger.info("Shutdown already in progress.")
# Register signal handlers for graceful termination
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
# --- Main Execution ---
if __name__ == "__main__":
logger.info("Starting Batch Consumer Worker...")
start_time = time.monotonic()
# Initialize essential components (Kafka, ClickHouse)
# Getters contain retry logic.
_consumer = get_kafka_consumer()
_producer = get_kafka_producer()
_ch_client = get_clickhouse_client()
# Proceed only if all components initialized successfully
if _consumer and _producer and _ch_client:
try:
main_loop() # Start the main processing loop
except Exception as main_loop_err:
logger.exception(f"Consumer main loop terminated unexpectedly: {main_loop_err}")
# Ensure running is False so cleanup happens
running = False
finally:
logger.info("Main loop finished or terminated.")
# --- Cleanup ---
logger.info("Closing Kafka and ClickHouse connections...")
if consumer:
try:
consumer.close()
logger.info("Kafka Consumer closed.")
except Exception as e:
logger.error(f"Error closing Kafka consumer: {e}")
if producer:
try:
producer.flush(timeout=10) # Wait up to 10s for DLQ messages
producer.close()
logger.info("Kafka Producer closed.")
except Exception as e:
logger.error(f"Error flushing/closing Kafka producer: {e}")
if ch_client:
try:
ch_client.close()
logger.info("ClickHouse client closed.")
except Exception as e:
logger.error(f"Error closing ClickHouse client: {e}")
else:
logger.error("!!! Failed to initialize essential components after retries. Consumer cannot start.")
sys.exit(1) # Exit with error code
logger.info(f"Consumer worker shut down gracefully. Total runtime: {time.monotonic() - start_time:.2f} seconds.")
sys.exit(0) # Exit cleanly
EOF
# --- build.sh ---
echo "Creating build.sh..."
cat << 'EOF' > "${REPO_NAME}/build.sh"
#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.
echo "--- Building Docker Images ---"
# Ensure Docker is available
if ! command -v docker &> /dev/null; then
echo "Error: 'docker' command not found. Please install Docker."
exit 1
fi
if ! docker info > /dev/null 2>&1; then
echo "Error: Docker daemon is not running. Please start Docker."
exit 1
fi
# Build the images using Docker Compose
# Use DOCKER_BUILDKIT=1 for potentially faster builds with better caching
export DOCKER_BUILDKIT=1
# Build with --pull to get latest base images, --no-cache forces rebuild if needed
# Omitting --no-cache for faster subsequent builds unless specified
docker compose build --pull
echo "--- Build complete ---"
EOF
chmod +x "${REPO_NAME}/build.sh"
# --- up.sh ---
echo "Creating up.sh..."
cat << 'EOF' > "${REPO_NAME}/up.sh"
#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.
echo "--- Starting Services ---"
echo "NOTE: Kafka KRaft might take 30-60 seconds to initialize fully on first run."
# Ensure Docker is available
if ! command -v docker &> /dev/null; then
echo "Error: 'docker' command not found. Please install Docker."
exit 1
fi
if ! docker info > /dev/null 2>&1; then
echo "Error: Docker daemon is not running. Please start Docker."
exit 1
fi
# Start services in detached mode
# --remove-orphans removes containers for services not defined in the compose file
# --force-recreate ensures containers are recreated if configuration changed (useful after edits)
# --wait makes the command wait until containers are healthy (based on healthchecks)
echo "Running docker compose up -d --wait..."
docker compose up -d --remove-orphans --force-recreate --wait
# Check status after waiting (optional but helpful)
echo "--- Service Status ---"
docker compose ps
echo "----------------------"
echo "Services started in the background."
echo "API should be available at http://localhost:8000"
echo "ClickHouse HTTP interface at http://localhost:8123"
echo "Monitor logs using: docker compose logs -f [service_name]"
echo "(e.g., docker compose logs -f api or docker compose logs -f consumer)"
EOF
chmod +x "${REPO_NAME}/up.sh"
# --- down.sh ---
echo "Creating down.sh..."
cat << 'EOF' > "${REPO_NAME}/down.sh"
#!/bin/bash
# Default behavior: stop and remove containers and networks
REMOVE_VOLUMES=false
PRUNE_IMAGES=false
# --- Argument Parsing ---
while [[ "$#" -gt 0 ]]; do
case $1 in
-v|--volumes) REMOVE_VOLUMES=true; echo "Flag -v set: Will remove volumes."; ;;
-f|--force-images) PRUNE_IMAGES=true; REMOVE_VOLUMES=true; echo "Flag -f set: Will remove volumes and attempt image pruning."; ;; # -f implies -v
-h|--help)
echo "Usage: $0 [-v] [-f]"
echo " -v, --volumes Remove associated Docker volumes (kafka_data, clickhouse_data, etc.)."
echo " -f, --force-images Remove associated Docker volumes AND attempt to remove built images (api, consumer)."
exit 0
;;
*) echo "Unknown parameter passed: $1. Use -h or --help for options."; exit 1 ;;
esac
shift
done
# --- Pre-checks ---
if ! command -v docker &> /dev/null; then
echo "Warning: 'docker' command not found. Cannot perform cleanup."
exit 0 # Exit gracefully if docker isn't installed
fi
if ! docker info > /dev/null 2>&1; then
echo "Warning: Docker daemon is not running. Cannot perform cleanup."
exit 0 # Exit gracefully
fi
if [ ! -f docker-compose.yml ]; then
echo "Warning: docker-compose.yml not found in current directory. Cannot determine services/volumes accurately."
# Attempt basic down anyway
fi
# --- Stop and Remove Containers/Networks ---
echo "Stopping and removing containers defined in docker-compose.yml..."
# Use --timeout to gracefully stop containers before killing
docker compose down --timeout 30
# --- Remove Volumes (if requested) ---
if [ "$REMOVE_VOLUMES" = true ]; then
echo "Removing volumes defined in docker-compose.yml (kafka_data, clickhouse_data, clickhouse_logs)..."
# This removes *all* volumes defined in the compose file if they exist.
# Use --volumes flag for clarity over -v
docker compose down --volumes
echo "Volumes removed."
fi
# --- Prune Images (if requested) ---
if [ "$PRUNE_IMAGES" = true ]; then
echo "Attempting to remove built images (api, consumer)..."
# Try to get image names from compose config (requires compose v2.5+)
# Fallback to simple grep if jq/compose config fails
API_IMAGE=$(docker compose config --format json | jq -r '.services.api.image // ""' 2>/dev/null || grep -A 2 'api:' docker-compose.yml | grep 'image:' | awk '{print $2}' || echo "")
CONSUMER_IMAGE=$(docker compose config --format json | jq -r '.services.consumer.image // ""' 2>/dev/null || grep -A 2 'consumer:' docker-compose.yml | grep 'image:' | awk '{print $2}' || echo "")
# Default names if detection failed (adjust if your build names differ)
API_IMAGE=${API_IMAGE:-kafka-batch-upload-api}
CONSUMER_IMAGE=${CONSUMER_IMAGE:-kafka-batch-upload-consumer}
# Attempt to remove specific built images if they exist
echo "Attempting to remove image: ${API_IMAGE}"
docker rmi "${API_IMAGE}" > /dev/null 2>&1 || echo " -> Image ${API_IMAGE} not found or removal failed."
echo "Attempting to remove image: ${CONSUMER_IMAGE}"
docker rmi "${CONSUMER_IMAGE}" > /dev/null 2>&1 || echo " -> Image ${CONSUMER_IMAGE} not found or removal failed."
echo "Pruning all dangling images..."
docker image prune -f
echo "WARNING: Dangling image prune removes *all* unused images on your system, not just for this project."
fi
echo "--- Cleanup complete ---"
EOF
chmod +x "${REPO_NAME}/down.sh"
# --- Final Instructions ---
echo ""
echo "Setup script finished creating files in ./${REPO_NAME}"
echo ""
echo "NEXT STEPS:"
echo "1. Navigate to the project directory:"
echo " cd ${REPO_NAME}"
echo "2. Build the Docker images:"
echo " ./build.sh"
echo "3. Start all services:"
echo " ./up.sh"
echo ""
echo " (Wait ~30-60 seconds for Kafka to initialize fully)"
echo ""
echo "4. Access the UI at: http://localhost:8000"
echo ""
echo "To stop services:"
echo " ./down.sh"
echo ""
echo "To stop services AND remove data volumes:"
echo " ./down.sh -v"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment