|
#!/usr/bin/env python3 |
|
""" |
|
Migrate recordings from Foxglove Cloud to Azure Blob Storage. |
|
|
|
Work is split into two steps: |
|
1. download - Fetch recordings from Foxglove into a local downloads/ folder. |
|
2. upload - Upload files from downloads/ to Azure; on success move to uploaded/. |
|
|
|
Downloads are streamed to disk (never full file in memory), so large recordings |
|
up to 100GB+ are supported. Progress shows bytes downloaded, total, percentage, and speed. |
|
|
|
Requirements: |
|
pip install foxglove-client |
|
pip install azure-storage-blob azure-identity # for upload progress bar (optional; falls back to az CLI) |
|
az login # Azure CLI must be installed and authenticated (or use --account-key) |
|
|
|
Environment variables: |
|
FOXGLOVE_API_TOKEN - Foxglove API token (fox_sk_...) |
|
AZURE_STORAGE_ACCOUNT - Azure storage account name |
|
AZURE_STORAGE_CONTAINER - Azure blob container name (default: "inbox") |
|
AZURE_STORAGE_KEY - Optional; use with --auth-mode key for uploads |
|
FOXGLOVE_DEVICE_ID - Optional; upload sets blob metadata foxglove_device_id if not in manifest |
|
|
|
Usage: |
|
# Step 1: Download recordings (all time, or use --start/--end) |
|
python migrate-recordings-to-azure.py download --device-id dev_XXXXX |
|
python migrate-recordings-to-azure.py download --start 2024-01-01T00:00:00Z --end 2024-12-31T23:59:59Z |
|
|
|
# Step 2: Upload everything in downloads/ to Azure (moves to uploaded/ on success) |
|
python migrate-recordings-to-azure.py upload --storage-account migraitontest |
|
""" |
|
|
|
import argparse |
|
import json |
|
import os |
|
import shutil |
|
import subprocess |
|
import sys |
|
import time |
|
from datetime import datetime, timezone |
|
from pathlib import Path |
|
|
|
import requests |
|
from foxglove.client import Client |
|
|
|
try: |
|
from azure.identity import DefaultAzureCredential |
|
from azure.storage.blob import BlobClient |
|
_AZURE_SDK_AVAILABLE = True |
|
except ImportError: |
|
_AZURE_SDK_AVAILABLE = False |
|
|
|
|
|
MANIFEST_FILENAME = "manifest.json" |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser( |
|
description="Migrate recordings from Foxglove Cloud to Azure Blob Storage (download and upload steps)" |
|
) |
|
parser.add_argument( |
|
"--data-dir", |
|
default="migration-data", |
|
help="Directory containing downloads/ and uploaded/ (default: migration-data)", |
|
) |
|
subparsers = parser.add_subparsers(dest="command", required=True, help="Command to run") |
|
|
|
# --- download --- |
|
dl = subparsers.add_parser("download", help="Download recordings from Foxglove into downloads/") |
|
dl.add_argument("--device-id", help="Foxglove device ID to filter recordings (e.g. dev_XXXXX)") |
|
dl.add_argument("--device-name", help="Foxglove device name to filter recordings") |
|
dl.add_argument( |
|
"--start", |
|
help="Start time (ISO 8601). Omit for all-time.", |
|
) |
|
dl.add_argument( |
|
"--end", |
|
help="End time (ISO 8601). Omit for all-time.", |
|
) |
|
dl.add_argument( |
|
"--dry-run", |
|
action="store_true", |
|
help="List what would be downloaded without writing files", |
|
) |
|
|
|
# --- upload --- |
|
up = subparsers.add_parser("upload", help="Upload files from downloads/ to Azure (move to uploaded/ on success)") |
|
up.add_argument( |
|
"--storage-account", |
|
default=os.environ.get("AZURE_STORAGE_ACCOUNT"), |
|
help="Azure storage account name (or set AZURE_STORAGE_ACCOUNT)", |
|
) |
|
up.add_argument( |
|
"--container", |
|
default=os.environ.get("AZURE_STORAGE_CONTAINER", "inbox"), |
|
help="Azure blob container name (default: inbox)", |
|
) |
|
up.add_argument( |
|
"--blob-prefix", |
|
default="foxglove-recordings/", |
|
help="Prefix for blob names in Azure (default: foxglove-recordings/)", |
|
) |
|
up.add_argument( |
|
"--auth-mode", |
|
choices=("login", "key"), |
|
default="login", |
|
help="Azure auth: login (default) or key", |
|
) |
|
up.add_argument( |
|
"--account-key", |
|
default=os.environ.get("AZURE_STORAGE_KEY"), |
|
help="Storage account key (required if --auth-mode key)", |
|
) |
|
up.add_argument( |
|
"--dry-run", |
|
action="store_true", |
|
help="List what would be uploaded without uploading", |
|
) |
|
up.add_argument( |
|
"--device-id", |
|
default=os.environ.get("FOXGLOVE_DEVICE_ID"), |
|
help="Foxglove device id for blob metadata foxglove_device_id when missing from manifest (or set FOXGLOVE_DEVICE_ID)", |
|
) |
|
|
|
return parser.parse_args() |
|
|
|
|
|
def get_foxglove_client(): |
|
token = os.environ.get("FOXGLOVE_API_TOKEN") |
|
if not token: |
|
print("Error: FOXGLOVE_API_TOKEN environment variable is not set.", file=sys.stderr) |
|
print("Create an API token at https://app.foxglove.dev/~/settings", file=sys.stderr) |
|
sys.exit(1) |
|
return Client(token=token) |
|
|
|
|
|
def fetch_recordings(client, device_id, device_name, start, end): |
|
"""Fetch all recordings matching the given filters, paginating if needed.""" |
|
kwargs = { |
|
"import_status": "complete", |
|
"sort_by": "start", |
|
"sort_order": "asc", |
|
} |
|
if start is not None: |
|
kwargs["start"] = start |
|
if end is not None: |
|
kwargs["end"] = end |
|
if device_id: |
|
kwargs["device_id"] = device_id |
|
if device_name: |
|
kwargs["device_name"] = device_name |
|
|
|
all_recordings = [] |
|
offset = 0 |
|
limit = 100 |
|
|
|
while True: |
|
kwargs["limit"] = limit |
|
kwargs["offset"] = offset |
|
batch = client.get_recordings(**kwargs) |
|
if not batch: |
|
break |
|
all_recordings.extend(batch) |
|
if len(batch) < limit: |
|
break |
|
offset += limit |
|
|
|
return all_recordings |
|
|
|
|
|
def format_size(size_bytes): |
|
for unit in ("B", "KB", "MB", "GB", "TB"): |
|
if size_bytes < 1024: |
|
return f"{size_bytes:.1f} {unit}" |
|
size_bytes /= 1024 |
|
return f"{size_bytes:.1f} PB" |
|
|
|
|
|
def get_recording_stream_url(token: str, recording_id: str, host: str = "api.foxglove.dev") -> str: |
|
"""Get a short-lived stream URL for downloading recording data. Used for streaming to disk.""" |
|
resp = requests.post( |
|
f"https://{host}/v1/data/stream", |
|
headers={ |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {token}", |
|
}, |
|
json={ |
|
"recordingId": recording_id, |
|
"includeAttachments": False, |
|
"outputFormat": "mcap", |
|
}, |
|
timeout=30, |
|
) |
|
resp.raise_for_status() |
|
return resp.json()["link"] |
|
|
|
|
|
DOWNLOAD_CHUNK_SIZE = 2 * 1024 * 1024 # 2 MB for 100GB+ files |
|
PROGRESS_INTERVAL = 2.0 # seconds between progress updates |
|
|
|
|
|
def stream_download_to_file(url: str, filepath: Path, total_estimate=None, session=None) -> int: |
|
"""Stream download from URL to file. Never loads the full file into memory. Returns bytes written.""" |
|
sess = session or requests.Session() |
|
resp = sess.get(url, stream=True, timeout=60) |
|
resp.raise_for_status() |
|
total_from_header = resp.headers.get("Content-Length") |
|
total = int(total_from_header) if total_from_header else total_estimate |
|
bytes_written = 0 |
|
start_time = time.monotonic() |
|
last_print_time = start_time |
|
last_print_bytes = 0 |
|
|
|
with open(filepath, "wb") as f: |
|
for chunk in resp.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): |
|
if chunk: |
|
f.write(chunk) |
|
bytes_written += len(chunk) |
|
now = time.monotonic() |
|
if now - last_print_time >= PROGRESS_INTERVAL: |
|
elapsed = now - start_time |
|
speed = (bytes_written - last_print_bytes) / (now - last_print_time) if (now - last_print_time) > 0 else 0 |
|
pct = f" ({100.0 * bytes_written / total:.1f}%)" if total and total > 0 else "" |
|
total_str = format_size(total) if total else "?" |
|
line = f" Downloaded: {format_size(bytes_written)} / {total_str}{pct} {format_size(int(speed))}/s " |
|
sys.stdout.write("\r" + line) |
|
sys.stdout.flush() |
|
last_print_time = now |
|
last_print_bytes = bytes_written |
|
|
|
elapsed = time.monotonic() - start_time |
|
speed = bytes_written / elapsed if elapsed > 0 else 0 |
|
if total and total > 0: |
|
sys.stdout.write("\r Downloaded: " + format_size(bytes_written) + " / " + format_size(total) + " (100.0%) " + format_size(int(speed)) + "/s \n") |
|
else: |
|
sys.stdout.write("\r Downloaded: " + format_size(bytes_written) + " " + format_size(int(speed)) + "/s \n") |
|
sys.stdout.flush() |
|
return bytes_written |
|
|
|
|
|
def resolve_foxglove_device_id(recording, filter_device_id=None): |
|
"""Device id for Azure blob metadata (foxglove_device_id). Prefer API recording.device; else download filter.""" |
|
device = recording.get("device") |
|
if isinstance(device, dict): |
|
did = device.get("id") or device.get("deviceId") |
|
if did: |
|
return did |
|
if isinstance(device, str) and device.strip(): |
|
return device.strip() |
|
if filter_device_id: |
|
return filter_device_id |
|
return None |
|
|
|
|
|
def make_blob_name(recording, prefix): |
|
"""Build a blob name from the recording path and metadata.""" |
|
rec_path = recording.get("path", "") |
|
if rec_path: |
|
name = rec_path.lstrip("/") |
|
else: |
|
name = f"{recording['id']}.mcap" |
|
return f"{prefix}{name}" |
|
|
|
|
|
def recording_to_local_path(recording) -> str: |
|
"""Return a relative path for the recording file (same as API path, or rec_id.mcap if no path).""" |
|
rec_id = recording["id"] |
|
raw = (recording.get("path") or "").strip("/") |
|
if not raw: |
|
return f"{rec_id}.mcap" |
|
normalized = os.path.normpath(raw) |
|
if normalized.startswith("..") or os.path.isabs(normalized): |
|
return f"{rec_id}.mcap" |
|
return normalized |
|
|
|
|
|
# --- Manifests --- |
|
|
|
|
|
def manifest_path(base_dir: Path) -> Path: |
|
return base_dir / MANIFEST_FILENAME |
|
|
|
|
|
def load_manifest(base_dir: Path) -> dict: |
|
p = manifest_path(base_dir) |
|
if not p.exists(): |
|
return {} |
|
with open(p, "r") as f: |
|
return json.load(f) |
|
|
|
|
|
def save_manifest(base_dir: Path, data: dict): |
|
base_dir.mkdir(parents=True, exist_ok=True) |
|
with open(manifest_path(base_dir), "w") as f: |
|
json.dump(data, f, indent=2) |
|
|
|
|
|
# --- Download command --- |
|
|
|
|
|
def cmd_download(args): |
|
data_dir = Path(args.data_dir) |
|
downloads_dir = data_dir / "downloads" |
|
uploaded_dir = data_dir / "uploaded" |
|
uploaded_dir.mkdir(parents=True, exist_ok=True) |
|
downloads_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
start = None |
|
end = None |
|
if args.start: |
|
start = datetime.fromisoformat(args.start.replace("Z", "+00:00")) |
|
if start.tzinfo is None: |
|
start = start.replace(tzinfo=timezone.utc) |
|
if args.end: |
|
end = datetime.fromisoformat(args.end.replace("Z", "+00:00")) |
|
if end.tzinfo is None: |
|
end = end.replace(tzinfo=timezone.utc) |
|
if start is not None and end is not None and start > end: |
|
print("Error: --start must be before --end.", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
client = get_foxglove_client() |
|
token = os.environ.get("FOXGLOVE_API_TOKEN") |
|
uploaded_manifest = load_manifest(uploaded_dir) |
|
downloads_manifest = load_manifest(downloads_dir) |
|
|
|
if start is not None and end is not None: |
|
print(f"Fetching recording list from {start.isoformat()} to {end.isoformat()}...") |
|
else: |
|
print("Fetching recording list (all time)...") |
|
if args.device_id: |
|
print(f" Device ID: {args.device_id}") |
|
if args.device_name: |
|
print(f" Device name: {args.device_name}") |
|
|
|
recordings = fetch_recordings(client, args.device_id, args.device_name, start, end) |
|
if not recordings: |
|
print("No recordings found matching the given filters.") |
|
return |
|
|
|
to_download = [] |
|
skipped_uploaded = [] |
|
skipped_downloaded = [] |
|
for rec in recordings: |
|
rec_id = rec["id"] |
|
rec_path = rec.get("path", rec_id) |
|
if rec_id in uploaded_manifest: |
|
skipped_uploaded.append(rec_path) |
|
continue |
|
rel_path = recording_to_local_path(rec) |
|
local_file = downloads_dir / rel_path |
|
if local_file.exists(): |
|
local_size = local_file.stat().st_size |
|
meta = downloads_manifest.get(rec_id) |
|
if meta is not None: |
|
# We have a manifest entry: we completed this in a previous run. Trust that. |
|
stored_size = meta.get("downloaded_size") |
|
if stored_size is not None and local_size != stored_size: |
|
# File size doesn't match what we wrote -> re-download |
|
pass |
|
else: |
|
skipped_downloaded.append(rec_path) |
|
continue # Skip: already complete |
|
else: |
|
# Orphan file (e.g. script died after write, before save). Use API size only as hint. |
|
remote_size = rec.get("size") |
|
if remote_size is not None and local_size == remote_size: |
|
fid = resolve_foxglove_device_id(rec, args.device_id) |
|
downloads_manifest[rec_id] = { |
|
"blob_name": make_blob_name(rec, ""), |
|
"path": rec.get("path", ""), |
|
"size": remote_size, |
|
"file": rel_path, |
|
"downloaded_size": local_size, |
|
**({"foxglove_device_id": fid} if fid else {}), |
|
} |
|
save_manifest(downloads_dir, downloads_manifest) |
|
skipped_downloaded.append(rec_path) |
|
continue |
|
# Size mismatch or unknown -> re-download |
|
to_download.append(rec) |
|
|
|
if skipped_uploaded or skipped_downloaded: |
|
print("\nSkipping (already complete):") |
|
for path in skipped_uploaded: |
|
print(f" [uploaded] {path}") |
|
for path in skipped_downloaded: |
|
print(f" [downloaded] {path}") |
|
print() |
|
|
|
if not to_download: |
|
print(f"\nNothing to download (all {len(recordings)} recording(s) already in downloads/ or uploaded/).") |
|
return |
|
|
|
total_size = sum(r.get("size", 0) for r in to_download) |
|
print(f"\nFound {len(recordings)} recording(s); {len(to_download)} to download ({format_size(total_size)})\n") |
|
|
|
if args.dry_run: |
|
for i, rec in enumerate(to_download, 1): |
|
print(f" [{i}/{len(to_download)}] {rec.get('path', 'N/A')} ({format_size(rec.get('size', 0))})") |
|
print("\nDry run - no files written.") |
|
return |
|
|
|
for i, rec in enumerate(to_download, 1): |
|
rec_id = rec["id"] |
|
rec_path = rec.get("path", "N/A") |
|
blob_name = make_blob_name(rec, "") # path suffix only; prefix applied at upload |
|
rel_path = recording_to_local_path(rec) |
|
local_file = downloads_dir / rel_path |
|
local_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
print(f"[{i}/{len(to_download)}] {rec_path} ({format_size(rec.get('size', 0))})") |
|
print(" Getting download link...", end="", flush=True) |
|
url = get_recording_stream_url(token, rec_id) |
|
print(" streaming to disk") |
|
stream_download_to_file( |
|
url, |
|
local_file, |
|
total_estimate=rec.get("size"), |
|
) |
|
bytes_written = local_file.stat().st_size |
|
fid = resolve_foxglove_device_id(rec, args.device_id) |
|
downloads_manifest[rec_id] = { |
|
"blob_name": blob_name, |
|
"path": rec.get("path", ""), |
|
"size": rec.get("size"), |
|
"file": rel_path, |
|
"downloaded_size": bytes_written, |
|
**({"foxglove_device_id": fid} if fid else {}), |
|
} |
|
save_manifest(downloads_dir, downloads_manifest) |
|
|
|
print(f"\nDownload complete. {len(to_download)} recording(s) in {downloads_dir}") |
|
|
|
|
|
# --- Upload command --- |
|
|
|
|
|
def _get_blob_client(storage_account, container, blob_name, auth_mode="login", account_key=None): |
|
if not _AZURE_SDK_AVAILABLE: |
|
return None |
|
account_url = f"https://{storage_account}.blob.core.windows.net" |
|
if auth_mode == "key": |
|
if not account_key: |
|
raise ValueError("Account key required when using --auth-mode key") |
|
return BlobClient(account_url, container_name=container, blob_name=blob_name, credential=account_key) |
|
return BlobClient( |
|
account_url, container_name=container, blob_name=blob_name, credential=DefaultAzureCredential() |
|
) |
|
|
|
|
|
def blob_exists_in_azure(blob_name, storage_account, container, auth_mode="login", account_key=None) -> bool: |
|
"""Check if a blob exists in Azure. Returns True if it exists.""" |
|
client = _get_blob_client(storage_account, container, blob_name, auth_mode, account_key) |
|
if client is not None: |
|
try: |
|
return client.exists() |
|
except Exception: |
|
return False |
|
cmd = [ |
|
"az", "storage", "blob", "exists", |
|
"--account-name", storage_account, |
|
"--container-name", container, |
|
"--name", blob_name, |
|
"--auth-mode", auth_mode, |
|
] |
|
if auth_mode == "key": |
|
if not account_key: |
|
raise ValueError("Account key required when using --auth-mode key") |
|
cmd.extend(["--account-key", account_key]) |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
if result.returncode != 0: |
|
return False # e.g. no permission to check; proceed with upload |
|
try: |
|
out = json.loads(result.stdout.strip()) |
|
return out.get("exists", False) |
|
except (json.JSONDecodeError, TypeError): |
|
return False |
|
|
|
|
|
def upload_to_azure( |
|
local_path, |
|
blob_name, |
|
storage_account, |
|
container, |
|
auth_mode="login", |
|
account_key=None, |
|
foxglove_device_id=None, |
|
): |
|
"""Upload a file to Azure Blob Storage. Uses SDK with progress bar when available, else Azure CLI.""" |
|
local_path = Path(local_path) |
|
total_size = local_path.stat().st_size |
|
blob_metadata = {} |
|
if foxglove_device_id: |
|
blob_metadata["foxglove_device_id"] = foxglove_device_id |
|
client = _get_blob_client(storage_account, container, blob_name, auth_mode, account_key) |
|
if client is not None: |
|
start_time = time.monotonic() |
|
last_print_time = [start_time] |
|
|
|
def progress_hook(response): |
|
ctx = getattr(response, "context", {}) |
|
current = ctx.get("upload_stream_current") |
|
total = ctx.get("data_stream_total") or total_size |
|
if current is None: |
|
return |
|
now = time.monotonic() |
|
if now - last_print_time[0] >= PROGRESS_INTERVAL: |
|
speed = current / (now - start_time) if (now - start_time) > 0 else 0 |
|
pct = f" ({100.0 * current / total:.1f}%)" if total and total > 0 else "" |
|
total_str = format_size(total) if total else "?" |
|
sys.stdout.write("\r Uploaded: " + format_size(current) + " / " + total_str + pct + " " + format_size(int(speed)) + "/s ") |
|
sys.stdout.flush() |
|
last_print_time[0] = now |
|
|
|
upload_kwargs = {"overwrite": True, "raw_response_hook": progress_hook} |
|
if blob_metadata: |
|
upload_kwargs["metadata"] = blob_metadata |
|
with open(local_path, "rb") as f: |
|
client.upload_blob(f, **upload_kwargs) |
|
elapsed = time.monotonic() - start_time |
|
speed = total_size / elapsed if elapsed > 0 else 0 |
|
sys.stdout.write("\r Uploaded: " + format_size(total_size) + " / " + format_size(total_size) + " (100.0%) " + format_size(int(speed)) + "/s \n") |
|
sys.stdout.flush() |
|
return |
|
cmd = [ |
|
"az", "storage", "blob", "upload", |
|
"--account-name", storage_account, |
|
"--container-name", container, |
|
"--name", blob_name, |
|
"--file", str(local_path), |
|
"--auth-mode", auth_mode, |
|
"--overwrite", "true", |
|
] |
|
if auth_mode == "key": |
|
if not account_key: |
|
raise ValueError("Account key required when using --auth-mode key") |
|
cmd.extend(["--account-key", account_key]) |
|
if blob_metadata: |
|
cmd.extend(["--metadata", f"foxglove_device_id={foxglove_device_id}"]) |
|
sys.stdout.write(" Uploading (install azure-storage-blob and azure-identity for progress bar)...\n") |
|
sys.stdout.flush() |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
if result.returncode != 0: |
|
raise RuntimeError(f"Azure upload failed: {result.stderr.strip()}") |
|
|
|
|
|
def cmd_upload(args): |
|
if not args.storage_account and not args.dry_run: |
|
print( |
|
"Error: Azure storage account is required. " |
|
"Use --storage-account or set AZURE_STORAGE_ACCOUNT.", |
|
file=sys.stderr, |
|
) |
|
sys.exit(1) |
|
if args.auth_mode == "key" and not args.account_key and not args.dry_run: |
|
print( |
|
"Error: --account-key or AZURE_STORAGE_KEY required when using --auth-mode key.", |
|
file=sys.stderr, |
|
) |
|
sys.exit(1) |
|
|
|
data_dir = Path(args.data_dir) |
|
downloads_dir = data_dir / "downloads" |
|
uploaded_dir = data_dir / "uploaded" |
|
uploaded_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
downloads_manifest = load_manifest(downloads_dir) |
|
if not downloads_manifest: |
|
print(f"No manifest in {downloads_dir}. Run the download command first.") |
|
return |
|
|
|
# Only upload entries whose file exists |
|
pending = [ |
|
(rec_id, meta) |
|
for rec_id, meta in downloads_manifest.items() |
|
if (downloads_dir / meta.get("file", f"{rec_id}.mcap")).exists() |
|
] |
|
if not pending: |
|
print(f"No downloadable files found in {downloads_dir}.") |
|
return |
|
|
|
print(f"Uploading {len(pending)} file(s) from {downloads_dir} to Azure...") |
|
print(f" Storage account: {args.storage_account}") |
|
print(f" Container: {args.container}") |
|
print(f" Blob prefix: {args.blob_prefix}\n") |
|
|
|
missing_dev = [ |
|
(rec_id, meta) |
|
for rec_id, meta in pending |
|
if not (meta.get("foxglove_device_id") or args.device_id) |
|
] |
|
if missing_dev: |
|
print( |
|
" Warning: Some file(s) have no foxglove_device_id in manifest and --device-id / " |
|
"FOXGLOVE_DEVICE_ID is unset; blob metadata foxglove_device_id will be omitted for those.", |
|
file=sys.stderr, |
|
) |
|
|
|
if args.dry_run: |
|
for rec_id, meta in pending: |
|
blob_name = (args.blob_prefix + meta.get("blob_name", f"{rec_id}.mcap")).lstrip("/") |
|
fname = meta.get("file", f"{rec_id}.mcap") |
|
fid = meta.get("foxglove_device_id") or args.device_id |
|
meta_hint = f" metadata foxglove_device_id={fid}" if fid else " (no foxglove_device_id metadata)" |
|
print(f" {fname} -> {blob_name}\n {meta_hint}") |
|
print("\nDry run - no uploads.") |
|
return |
|
|
|
uploaded_manifest = load_manifest(uploaded_dir) |
|
failed = [] |
|
|
|
for i, (rec_id, meta) in enumerate(pending, 1): |
|
fname = meta.get("file", f"{rec_id}.mcap") |
|
local_file = downloads_dir / fname |
|
blob_name = (args.blob_prefix + meta.get("blob_name", f"{rec_id}.mcap")).lstrip("/") |
|
|
|
print(f"[{i}/{len(pending)}] {fname} -> {blob_name}") |
|
try: |
|
if blob_exists_in_azure( |
|
blob_name, |
|
args.storage_account, |
|
args.container, |
|
auth_mode=args.auth_mode, |
|
account_key=args.account_key, |
|
): |
|
print(" Warning: Blob already exists in Azure; will be overwritten.", file=sys.stderr) |
|
foxglove_device_id = meta.get("foxglove_device_id") or args.device_id |
|
upload_to_azure( |
|
local_file, |
|
blob_name, |
|
args.storage_account, |
|
args.container, |
|
auth_mode=args.auth_mode, |
|
account_key=args.account_key, |
|
foxglove_device_id=foxglove_device_id, |
|
) |
|
dest = uploaded_dir / fname |
|
dest.parent.mkdir(parents=True, exist_ok=True) |
|
shutil.move(str(local_file), str(dest)) |
|
uploaded_manifest[rec_id] = meta |
|
del downloads_manifest[rec_id] |
|
save_manifest(downloads_dir, downloads_manifest) |
|
save_manifest(uploaded_dir, uploaded_manifest) |
|
print(" done") |
|
except Exception as e: |
|
print(f" failed: {e}") |
|
failed.append((rec_id, str(e))) |
|
|
|
if failed: |
|
print(f"\n{len(failed)} upload(s) failed. Rest remain in downloads/.") |
|
sys.exit(1) |
|
print(f"\nUpload complete. {len(pending)} file(s) moved to {uploaded_dir}") |
|
|
|
|
|
def main(): |
|
args = parse_args() |
|
if args.command == "download": |
|
cmd_download(args) |
|
else: |
|
cmd_upload(args) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |