Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save honewatson/41541fb00fd62d57c444f78bb3a5435c to your computer and use it in GitHub Desktop.
Save honewatson/41541fb00fd62d57c444f78bb3a5435c to your computer and use it in GitHub Desktop.
client_python.py
#!/usr/bin/env python3
# /// script
# dependencies = [
# "boto3>=1.28.0", # For AWS S3 interaction
# # awsglue is not listed here as its import is optional and checked at runtime
# ]
# ///
"""
/// Example Usage
# If awsglue is importable (e.g., in an AWS Glue environment), scans S3:
# uv run aws_s3_scanner.py --bucket your-s3-bucket --prefix path/to/your/data/
# Force S3 scan even if awsglue is not detected (requires boto3 to be installed):
# uv run aws_s3_scanner.py --bucket your-s3-bucket --prefix path/to/your/data/ --force-s3
# If awsglue is NOT importable AND --force-s3 is NOT used, scans local filesystem:
# (Here, --bucket is treated as the base local directory)
# uv run aws_s3_scanner.py --bucket ./your_local_data_folder --prefix path/to/your/data/
# Example with no prefix (scans bucket root or base local directory root):
# uv run aws_s3_scanner.py --bucket your-s3-bucket
# Example forcing S3 scan for a bucket (no prefix):
# uv run aws_s3_scanner.py --bucket your-s3-bucket --force-s3
///
"""
import os
import sys
import argparse
import datetime
import json
import logging
import gzip
from pathlib import Path
from typing import List, Tuple, Optional
# --- Logger Setup ---
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set base level for the logger to capture all messages
# Handler for stdout (INFO level)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
stdout_handler.addFilter(lambda record: record.levelno == logging.INFO)
stdout_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
stdout_handler.setFormatter(stdout_formatter)
# Handler for stderr (WARNING, ERROR, CRITICAL levels)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.WARNING)
stderr_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s')
stderr_handler.setFormatter(stderr_formatter)
logger.addHandler(stdout_handler)
logger.addHandler(stderr_handler)
logger.propagate = False # Prevent duplicate logging if root logger is configured
# --- AWS Glue Check ---
has_awsglue = False
try:
import awsglue
# If specific awsglue context were needed, it would be initialized here.
# For this script, we only check for its presence.
has_awsglue = True
logger.debug("awsglue module successfully imported.")
except ImportError:
logger.debug("awsglue module not found.")
has_awsglue = False
# --- Boto3 Import ---
boto3 = None
ClientError = None
NoCredentialsError = None
PartialCredentialsError = None
try:
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError
logger.debug("boto3 module successfully imported.")
except ImportError:
logger.warning("boto3 library is not installed. S3 scan will not be possible if S3 operations are attempted.")
# boto3, ClientError etc. will remain None
def get_datetime_range_utc(for_today: bool = False) -> Tuple[datetime.datetime, datetime.datetime]:
"""Calculates the start and end datetime for a given day (today or yesterday) in UTC."""
today_utc = datetime.datetime.now(datetime.timezone.utc).date()
if for_today:
target_date_utc = today_utc
day_description = "today's"
else:
target_date_utc = today_utc - datetime.timedelta(days=1)
day_description = "yesterday's"
start_time = datetime.datetime.combine(target_date_utc, datetime.time.min, tzinfo=datetime.timezone.utc)
end_time = datetime.datetime.combine(target_date_utc, datetime.time.max, tzinfo=datetime.timezone.utc)
logger.debug(f"Calculated {day_description} UTC range: {start_time} to {end_time}")
return start_time, end_time
def s3_scan(bucket: str, prefix: str, start_time: datetime.datetime, end_time: datetime.datetime) -> List[str]:
"""
Scans an S3 bucket and prefix for .json.gz files modified within the given UTC time range.
Reads ndjson content from these files.
"""
if not boto3 or not ClientError or not NoCredentialsError or not PartialCredentialsError : # Ensure all boto3 components are loaded
logger.error("boto3 library or its components are not available. S3 scan cannot proceed.")
# sys.exit(1) # Exiting here might be too abrupt if fallback is desired.
return [] # Return empty list, main function will handle this
results: List[str] = []
s3_client = boto3.client('s3')
paginator = s3_client.get_paginator('list_objects_v2')
logger.info(f"Scanning S3 bucket '{bucket}', prefix '{prefix}' for files modified between {start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} and {end_time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
try:
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
if 'Contents' not in page:
continue
for obj in page['Contents']:
key = obj['Key']
# S3 LastModified is already a timezone-aware datetime object (UTC)
last_modified = obj['LastModified']
if key.endswith('.json.gz') and start_time <= last_modified <= end_time:
logger.info(f"Processing S3 object: s3://{bucket}/{key}")
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
with gzip.GzipFile(fileobj=response['Body']) as gz_file:
for line_bytes in gz_file:
results.append(line_bytes.decode('utf-8').strip())
except ClientError as e:
logger.error(f"Error reading S3 object s3://{bucket}/{key}: {e}")
except Exception as e:
logger.error(f"Unexpected error processing S3 object s3://{bucket}/{key}: {e}")
except (NoCredentialsError, PartialCredentialsError):
logger.error("AWS credentials not found. Please configure your AWS credentials for S3 access.")
return [] # Let main decide if to exit or report no data
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
logger.error(f"S3 bucket '{bucket}' not found.")
else:
logger.error(f"An S3 client error occurred: {e}")
return []
except Exception as e:
logger.error(f"An unexpected error occurred during S3 scan: {e}")
return []
return results
def folder_scan(base_path_str: str, prefix_str: str, start_time: datetime.datetime, end_time: datetime.datetime) -> List[str]:
"""
Scans a local folder structure for .json.gz files modified within the given UTC time range.
Reads ndjson content from these files.
"""
results: List[str] = []
scan_path = Path(base_path_str)
if prefix_str: # Ensure prefix is handled correctly as a sub-path
scan_path = Path(base_path_str) / prefix_str
if not scan_path.exists() or not scan_path.is_dir():
logger.error(f"Local path '{scan_path}' does not exist or is not a directory.")
return [] # Return empty list, main function will handle this
logger.info(f"Scanning local folder '{scan_path}' for files modified between {start_time.strftime('%Y-%m-%d %H:%M:%S %Z')} and {end_time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
for file_path in scan_path.rglob('*.json.gz'):
if file_path.is_file():
try:
stat_result = file_path.stat()
# Convert mtime (local timestamp) to UTC datetime object for comparison
last_modified_local = datetime.datetime.fromtimestamp(stat_result.st_mtime, tz=datetime.timezone.utc)
if start_time <= last_modified_local <= end_time:
logger.info(f"Processing local file: {file_path}")
with gzip.open(file_path, 'rt', encoding='utf-8') as gz_file:
for line in gz_file:
results.append(line.strip())
except Exception as e:
logger.error(f"Error processing local file {file_path}: {e}")
return results
def main():
parser = argparse.ArgumentParser(description="Scan S3 or local folders for ndjson.gz files from yesterday.")
parser.add_argument("--bucket", required=True, help="S3 bucket name OR base local directory path (if awsglue is not available).")
parser.add_argument("--prefix", default="", help="S3 prefix OR sub-directory path within the local base directory. Optional.")
parser.add_argument("--force-s3", action="store_true", help="Force the use of S3 scan (boto3) even if awsglue is not detected.")
parser.add_argument("--today", action="store_true", help="Scan for files from today (UTC) instead of yesterday.")
args = parser.parse_args()
start_dt_utc, end_dt_utc = get_datetime_range_utc(for_today=args.today)
logger.info(f"awsglue module available: {has_awsglue}")
logger.info(f"--force-s3 flag set: {args.force_s3}")
scan_day_message = "today" if args.today else "yesterday"
logger.info(f"Scanning for files modified on: {scan_day_message} (UTC)")
all_lines: List[str] = []
perform_s3_scan_intent = has_awsglue or args.force_s3
if perform_s3_scan_intent:
if not boto3:
logger.error("S3 scan is intended (due to awsglue presence or --force-s3 flag) but boto3 library is not installed or failed to import.")
sys.exit(1)
logger.info(f"Proceeding with S3 scan. Reason: awsglue detected ({has_awsglue}), --force-s3 flag ({args.force_s3}).")
all_lines = s3_scan(args.bucket, args.prefix, start_dt_utc, end_dt_utc)
else:
logger.info(f"awsglue module not available ({has_awsglue}) and --force-s3 flag not used ({args.force_s3}). Proceeding with local folder scan.")
all_lines = folder_scan(args.bucket, args.prefix, start_dt_utc, end_dt_utc)
if all_lines:
logger.info(f"Successfully retrieved {len(all_lines)} lines of ndjson data.")
# Log the final results as a JSON array of strings
# This can be very verbose for large datasets.
logger.info("Final collected data (first 10 lines if many):")
for i, line_content in enumerate(all_lines[:10]):
logger.info(line_content)
if len(all_lines) > 10:
logger.info(f"... and {len(all_lines) - 10} more lines.")
# If you need the full list logged as a single JSON string:
# try:
# logger.info(f"Full data as JSON: {json.dumps(all_lines)}")
# except TypeError:
# logger.error("Could not serialize full data list to JSON for logging.")
else:
logger.info(f"No ndjson data found matching the criteria for {scan_day_message}'s date range.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment