Skip to content

Instantly share code, notes, and snippets.

@likid0
Created February 7, 2025 09:34
Show Gist options
  • Save likid0/9b8c4db267a9b5065a711c302d1616bb to your computer and use it in GitHub Desktop.
Save likid0/9b8c4db267a9b5065a711c302d1616bb to your computer and use it in GitHub Desktop.
import boto3
import time
import datetime
from dateutil import parser
import botocore.exceptions
from colorama import init, Fore, Style
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
init(autoreset=True)
console = Console()
# --- Configuration ---
UPLOAD_ENDPOINT = 'http://ceph-node-00:8088'
CHECK_ENDPOINT = 'http://ceph-node-05:8088'
ENDPOINT_URL = 'http://ceph-node-05:8088'
AWS_ACCESS_KEY_ID = 'XXX'
AWS_SECRET_ACCESS_KEY = 'YYY'
REGION = 'multizg'
BUCKET_NAME = 'bucket1'
# List of objects (for demo purposes)
OBJECT_LIST = ['file1.txt', 'file2.txt', 'file3.txt']
ZONE_MAPPING = {
"b6c9ca95-6683-42a5-9dff-ba209039c61b": "Zone1",
"8f8c3759-aaaf-4e6d-b346-71ba03798cf3": "Zone2",
}
def parse_zone_info(rep_from, request_id):
source_zone = None
destination_zone = None
if rep_from:
source_zone_uuid = rep_from.split(":")[0]
source_zone = ZONE_MAPPING.get(source_zone_uuid, source_zone_uuid)
if request_id:
dest_zone_raw = request_id.split("-")[-1]
destination_zone = dest_zone_raw.capitalize()
return source_zone, destination_zone
def upload_object(key, content):
console.print(f"\n[cyan]Uploading object:[/cyan] {key} to source endpoint ({UPLOAD_ENDPOINT})")
start_time = time.time()
s3_upload.put_object(
Bucket=BUCKET_NAME,
Key=key,
Body=content,
ContentType='binary/octet-stream'
)
upload_duration = time.time() - start_time
console.print(f"[green]Uploaded {key} in {upload_duration:.2f} seconds[/green]")
# Return the upload completion time as a timestamp
return time.time()
def get_replication_info(key):
try:
response = s3_check.head_object(Bucket=BUCKET_NAME, Key=key)
except botocore.exceptions.ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code in ['404', 'NoSuchKey']:
return {}
else:
raise
headers = response['ResponseMetadata']['HTTPHeaders']
replication_status = headers.get('x-amz-replication-status')
replicated_at = headers.get('x-rgw-replicated-at')
replicated_from = headers.get('x-rgw-replicated-from')
request_id = headers.get('x-amz-request-id')
return {
'replication_status': replication_status,
'replicated_at': replicated_at,
'replicated_from': replicated_from,
'request_id': request_id
}
def wait_for_replication(key, upload_time, poll_interval=5, timeout=300):
start_time = upload_time # use the upload completion time as the start
while True:
info = get_replication_info(key)
if not info or not info.get('replication_status'):
console.print(f"[yellow]Object '{key}' not yet available on destination; waiting...[/yellow]")
else:
status = info.get('replication_status')
console.print(f"[magenta]Replication status for {key}:[/magenta] {status}")
if status.upper() == 'REPLICA' and info.get('replicated_at'):
try:
rep_dt = parser.parse(info['replicated_at'])
upload_dt = datetime.datetime.fromtimestamp(start_time, tz=datetime.timezone.utc)
replication_latency = (rep_dt - upload_dt).total_seconds()
if replication_latency < 0:
replication_latency = abs(replication_latency)
except Exception as e:
console.print(f"[red]Error parsing replicated timestamp: {e}[/red]")
replication_latency = None
return replication_latency, info
if time.time() - start_time > timeout:
console.print(f"[red]Timeout waiting for replication to complete.[/red]")
return None, info
time.sleep(poll_interval)
def main():
results = []
console.print(Panel.fit("[cyan]Global Content Distribution Demo[/cyan]", style="bold cyan"))
for obj in OBJECT_LIST:
content = f"Demo content for {obj} uploaded at {datetime.datetime.utcnow()}".encode('utf-8')
upload_timestamp = upload_object(obj, content)
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}")) as progress:
task = progress.add_task(f"Waiting for replication of {obj}...", total=None)
replication_latency, rep_info = wait_for_replication(obj, upload_timestamp)
progress.stop()
if replication_latency is not None:
source_zone, destination_zone = parse_zone_info(rep_info.get('replicated_from'), rep_info.get('request_id'))
console.print(f"\n[green]Object '{obj}' has been replicated![/green]")
console.print(f"[green]Replication latency: {replication_latency:.2f} seconds[/green]")
console.print("[blue]Replication details:[/blue]")
console.print(f" - Replication Status: [yellow]{rep_info.get('replication_status')}[/yellow]")
console.print(f" - Replicated At: [yellow]{rep_info.get('replicated_at')}[/yellow]")
console.print(f" - Replicated From: [yellow]{source_zone}[/yellow]")
console.print(f" - Destination: [yellow]{destination_zone}[/yellow]")
summary = (
f"[cyan]{obj}[/cyan]: [yellow]{source_zone}[/yellow] "
f"[white]----->[/white] [light_green]{replication_latency:.2f}s replication[/light_green] "
f"[white]----->[/white] [yellow]{destination_zone}[/yellow]"
)
console.print("\n" + summary + "\n")
results.append((obj, source_zone, destination_zone, f"{replication_latency:.2f}", rep_info.get('replication_status')))
else:
console.print(f"\n[red]Replication for '{obj}' did not complete within the timeout period.[/red]")
results.append((obj, "N/A", "N/A", "Timeout", "N/A"))
console.print(f"[cyan]{'-'*80}[/cyan]")
# Build a final summary table using Rich
table = Table(title="Replication Results", style="bold cyan")
table.add_column("File", style="cyan", no_wrap=True)
table.add_column("Source", style="magenta")
table.add_column("Destination", style="yellow")
table.add_column("Latency (s)", justify="right", style="green")
table.add_column("Status", style="bold blue")
for row in results:
table.add_row(*row)
console.print("\n")
console.print(table)
console.print(Panel.fit("[cyan]Demo complete. All objects have been processed.[/cyan]", style="bold cyan"))
if __name__ == '__main__':
s3_upload = boto3.client(
's3',
endpoint_url=UPLOAD_ENDPOINT,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION
)
s3_check = boto3.client(
's3',
endpoint_url=CHECK_ENDPOINT,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION
)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment