Skip to content

Instantly share code, notes, and snippets.

@mypy-play
Created June 10, 2026 06:50
Show Gist options
  • Select an option

  • Save mypy-play/6c166960aabdab35daca05f3f4c82bba to your computer and use it in GitHub Desktop.

Select an option

Save mypy-play/6c166960aabdab35daca05f3f4c82bba to your computer and use it in GitHub Desktop.
Shared via mypy Playground
#!/usr/bin/env python3
import sys
# Version check - must be first
if sys.version_info < (3, 14):
logger.info(f"Error: This script requires Python 3.14 or higher. Current version: {sys.version_info.major}.{sys.version_info.minor}")
sys.exit(1)
import asyncio
import compression.zstd as zstd
import gzip
import io
import json
import logging
import os
import re
import shutil
import ssl
import subprocess
import tarfile
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import URLError, HTTPError
from urllib.request import urlopen, Request, install_opener, build_opener, HTTPSHandler
# Package configuration
PKG_CHANNEL = "conda-forge"
PKG_NAME = "pixi"
PKG_VERSION = "latest"
API_URL = "https://api.anaconda.org"
#MIRROR_URL = "https://conda.anaconda.org"
MIRROR_URL = "http://oa-mirror.mediatek.inc/repository/conda"
conda_target_map = {
"linux-64": "unknown-linux-x86_64",
"linux-aarch64": "unknown-linux-arm64",
"osx-64": "apple-darwin-x86_64",
"osx-arm64": "apple-darwin-arm64",
"win-64": "pc-windows-x86_64",
"win-arm64": "pc-windows-arm64",
}
# Configure logging with Android logcat style
class AndroidLogFormatter(logging.Formatter):
"""Custom formatter for Android logcat style logging"""
LEVEL_MAP = {
logging.DEBUG: 'D',
logging.INFO: 'I',
logging.WARNING: 'W',
logging.ERROR: 'E',
logging.CRITICAL: 'F', # Fatal
}
def format(self, record: logging.LogRecord) -> str:
# Format: MM-DD HH:MM:SS.mmm PID TID L TAG: Message
timestamp = datetime.fromtimestamp(record.created).strftime('%m-%d %H:%M:%S.%f')[:-3]
level = self.LEVEL_MAP.get(record.levelno, 'I')
tag = record.name
pid = os.getpid()
return f"{timestamp} {pid:5d} {pid:5d} {level} {tag}: {record.getMessage()}"
# Setup logger
logger = logging.getLogger(Path(__file__).name)
logger.setLevel(logging.DEBUG)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(AndroidLogFormatter())
logger.addHandler(console_handler)
# Global SSL bypass - disable certificate verification
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Install global HTTPS handler with unverified SSL context
https_handler = HTTPSHandler(context=ssl_context)
opener = build_opener(https_handler)
install_opener(opener)
def run_subprocess(cmd: List[str], **kwargs: Any) -> subprocess.CompletedProcess[str]:
"""
Run subprocess with output redirected to logging.
Args:
cmd: Command list to execute
**kwargs: Additional arguments for subprocess.run
Returns:
subprocess.CompletedProcess result
"""
# Set default capture options
kwargs.setdefault('capture_output', True)
kwargs.setdefault('text', True)
logger.info(f" [PROC] Running: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, **kwargs)
# Log stdout if present
if result.stdout:
for line in result.stdout.strip().split('\n'):
if line:
logger.info(f" [PROC][OUT] {line}")
# Log stderr if present
if result.stderr:
for line in result.stderr.strip().split('\n'):
if line:
logger.warning(f" [PROC][ERR] {line}")
# Check return code if check=True was requested
if kwargs.get('check', False) and result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, cmd, result.stdout, result.stderr)
return result
except subprocess.CalledProcessError as e:
logger.error(f" [PROC] Command failed with return code {e.returncode}")
raise
async def download_file_async(url: str) -> Optional[bytes]:
"""
Async function to download a file from URL.
Args:
url: Download URL
Returns:
Downloaded data (bytes) or None on error
"""
try:
# Run blocking I/O in a thread pool
response = await asyncio.to_thread(urlopen, url)
content = await asyncio.to_thread(response.read)
return content
except (URLError, HTTPError) as e:
logger.error(f" Failed to download from {url}: {e}")
return None
except Exception as e:
logger.error(f" Unexpected error downloading from {url}: {e}")
return None
def get_conda_manifest() -> Optional[Dict[str, Any]]:
"""
Get pixi package manifest from Anaconda Cloud API.
Returns:
List of file info dictionaries (parsed JSON), or None if error
"""
endpoint = f"{API_URL}/package/{PKG_CHANNEL}/{PKG_NAME}"
logger.info("Fetching package info from Anaconda Cloud API...")
try:
response = urlopen(endpoint)
data = response.read().decode('utf-8')
files = json.loads(data)
return files
except (URLError, HTTPError) as e:
logger.info(f"Error fetching from Anaconda Cloud API: {e}")
return None
except Exception as e:
logger.info(f"Unexpected error: {e}")
return None
def fetch_conda_package(manifest: Optional[Dict[str, Any]] = None, version: Optional[str] = None, build_number: Optional[int] = None) -> Dict[str, bytes]:
"""
Fetch pixi package from conda mirror.
Args:
manifest: Package manifest from API
version: Version string (e.g., '1.0.1'). If None, returns empty dict.
build_number: Build number (integer). If None, gets the largest build number.
Returns:
Dict with target keys (from conda_target_map) and download URLs as values.
Returns empty dict on error.
"""
# Return empty dict if manifest or version is None
if manifest is None or version is None or build_number is None:
manifest_status = "None" if manifest is None else "OK"
logger.info(f"Error: Invalid arguments - manifest={manifest_status}, version={version}, build_number={build_number}")
return {}
# Step 1: Filter by version and generate download links
files = manifest.get('files', [])
if not files:
logger.info("No files found in manifest")
return {}
# Filter by version
matching_files = [f for f in files if f.get('version') == version]
if not matching_files:
logger.info(f"No matching files found for version: {version}")
return {}
# Filter by build_number
if build_number is None:
# Get the largest build number
max_build = max([f.get('attrs', {}).get('build_number', 0) for f in matching_files])
matching_files = [f for f in matching_files if f.get('attrs', {}).get('build_number', 0) == max_build]
else:
# Filter by specific build_number
matching_files = [f for f in matching_files if f.get('attrs', {}).get('build_number') == build_number]
if not matching_files:
logger.info(f"No matching files found for version: {version}, build_number: {build_number}")
return {}
# Process each file and download
async def process_file(file_info: Dict[str, Any]) -> Tuple[Optional[str], Optional[bytes]]:
basename = file_info.get('basename', 'unknown')
subdir = file_info.get('attrs', {}).get('subdir', 'noarch')
# Generate download URL
download_url = f"{MIRROR_URL}/{PKG_CHANNEL}/{basename}"
# Get target key from conda_target_map
target_key = conda_target_map.get(subdir)
if not target_key:
logger.info(f" [SKIP] {basename} (subdir '{subdir}' not in conda_target_map)")
return None, None
logger.info(f" [DOWNLOAD] {download_url}")
# Download the file
content = await download_file_async(download_url)
if content is None:
return target_key, None
logger.info(f" [DOWNLOAD][OK] {basename} ({len(content)} bytes)")
return target_key, content
# Download files in parallel using asyncio
async def download_all() -> List[Tuple[Optional[str], Optional[bytes]]]:
tasks = [process_file(f) for f in matching_files]
return await asyncio.gather(*tasks)
logger.info(f"Downloading {len(matching_files)} files in parallel...")
responses = asyncio.run(download_all())
# Build package_data from responses (store file data, not paths)
package_data = {}
for target_key, file_data in responses:
if target_key and file_data:
package_data[target_key] = file_data
return package_data
async def repack_conda_package_to_tgz(file_data: bytes) -> Optional[bytes]:
"""
Async repack conda package to tar.gz format.
Args:
file_data: Raw bytes of the conda package file
Returns:
Repacked tar.gz data or None on error
"""
try:
# Step 1: Extract the conda package (which is a tar.bz2 or .conda format)
file_obj = io.BytesIO(file_data)
# Try to open as tar.bz2 first (v1 conda format)
try:
outer_tar = tarfile.open(fileobj=file_obj, mode='r:bz2')
except:
# Try as .conda format (which is a zip file)
file_obj.seek(0)
try:
outer_tar = zipfile.ZipFile(file_obj)
except:
logger.error(" [REPACK] Unable to extract conda package - unknown format")
return None
# Step 2: Find and extract 'pkg-.+.tar.zst' file
pkg_tar = None
pkg_tar_name = "(None)"
if isinstance(outer_tar, tarfile.TarFile):
# Handle v1 (tar.bz2) format
pkg_tar = outer_tar
logger.info(f" [REPACK] Found v1 conda package")
else:
# Handle .conda (zip) format
for name in outer_tar.namelist():
if re.match(r'pkg-.+\.tar\.zst', name):
pkg_tar_name = name
logger.info(f" [REPACK] Found {pkg_tar_name}")
pkg_tar = await asyncio.to_thread(zstd.decompress, outer_tar.read(name))
break
if pkg_tar is None:
logger.error(" [REPACK] Could not find pkg-*.tar.zst file in conda package")
outer_tar.close()
return None
# Step 3: Compress raw tar data to tar.gz
def repack_tar() -> bytes:
# Compress raw tar data with gzip
gzip_output = io.BytesIO()
with gzip.GzipFile(fileobj=gzip_output, mode='wb', compresslevel=9, mtime=0) as gz:
gz.write(pkg_tar)
return gzip_output.getvalue()
tgz_data = await asyncio.to_thread(repack_tar)
logger.info(f" [REPACK][OK] Repacked {pkg_tar_name} to tar.gz format")
outer_tar.close()
return tgz_data
except Exception as e:
logger.error(f" [REPACK] Failed to repack package: {e}")
return None
def repo_is_branch_exists(branch_name: str) -> bool:
"""
Check if a git branch exists.
Args:
branch_name: Name of the branch to check
Returns:
True if branch exists, False otherwise
"""
try:
result = run_subprocess(['git', 'rev-parse', '--verify', '--quiet', branch_name], check=False)
return result.returncode != 0
except Exception as e:
logger.error(f"Git: failed to check branch existence: {e}")
return False
def repo_commit_to_branch(file_map: Dict[str, bytes], branch_name: str, commit_message: str) -> bool:
"""
Commit files to a git repository branch.
Creates a new branch, writes files based on file_map, stages them, and commits.
Args:
file_map: Dict with target keys and file data (bytes)
branch_name: Target branch name in format 'rel-{version}' where version contains
lowercase letters, numbers, dots, and hyphens (e.g., 'rel-1.0.1-2')
commit_message: Commit message
"""
logger.info(f"Git: creating and switching to branch: {branch_name}")
# Extract version from branch name
match = re.match(r'^rel-([-\.0-9a-z]+)$', branch_name)
if not match:
logger.error(f" Invalid branch name format: {branch_name}")
return False
version = match.group(1)
try:
# Create and checkout new branch
run_subprocess(['git', 'checkout', '-b', branch_name], check=True)
logger.info(f"Git: switched to branch: {branch_name}")
# Ensure 'prebuilts' directory exists
prebuilts_dir = Path('prebuilts')
prebuilts_dir.mkdir(parents=True, exist_ok=True)
# Ensure prebuilts directory exists and is empty
prebuilts_dir = Path('prebuilts')
prebuilts_dir.mkdir(parents=True, exist_ok=True)
# Delete any existing files in the directory
for child in prebuilts_dir.iterdir():
if child.is_file() or child.is_symlink():
child.unlink()
elif child.is_dir():
shutil.rmtree(child)
# Async file writing helper
async def write_binary_file_async(target_key: str, file_data: Optional[bytes]) -> Optional[Path]:
if file_data is None:
logger.info(f" [SKIP] {target_key} - no data")
return None
# Create filename: {target_key}-{version}.tar.gz
filename = f"{PKG_NAME}-{target_key}-{version}.tar.gz"
filepath = prebuilts_dir / filename
# Write file in thread pool
await asyncio.to_thread(filepath.write_bytes, file_data)
logger.info(f" [WRITE][OK] {filename} ({len(file_data)} bytes)")
return filepath
# Write all files in parallel
async def write_all_files() -> List[Optional[Path]]:
write_tasks = [write_binary_file_async(target_key, file_data) for target_key, file_data in file_map.items()]
return await asyncio.gather(*write_tasks)
logger.info(f"Writing {len(file_map)} files in parallel...")
written_files = asyncio.run(write_all_files())
# Stage all written files
logger.info(f"Git: staging changes in branch {branch_name}...")
for filepath in written_files:
if filepath is not None:
run_subprocess(['git', 'add', str(filepath)], check=False)
# Write version.txt with the version number
version_file = prebuilts_dir / "version.txt"
version_file.write_text(f"{version}\n", encoding="utf-8", newline="\n")
run_subprocess(['git', 'add', str(version_file)], check=False)
# Commit changes
logger.info(f"Git: committing changes in branch {branch_name}...")
run_subprocess(['git', 'commit', '-m', commit_message], check=True)
logger.info(f"Git: committed: {commit_message}")
except subprocess.CalledProcessError as e:
logger.error(f"Git: operation failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
return False
return True
def main() -> int:
"""Main entry point."""
logger.info(f"::: {PKG_NAME.upper()} Prebuilts Fetcher :::")
logger.info("=" * 39)
# Ensure we're in the git repository root directory
script_dir = Path(__file__).parent.resolve()
git_root = script_dir.parent
# Verify .git directory exists
git_dir = git_root / '.git'
if not git_dir.exists() or not git_dir.is_dir():
logger.error(f"Please run this script from within a root directory of git repository.")
return 2
logger.info(f"Git: repository root: {git_root}")
version_input = PKG_VERSION
if len(sys.argv) > 1:
version_input = sys.argv[1]
# Get manifest from API
manifest = get_conda_manifest()
if not manifest:
logger.error("Failed to fetch manifest")
return 3
# Replace version with latest_version if version is PKG_VERSION
if version_input == PKG_VERSION:
latest_version = manifest.get('latest_version')
if latest_version:
version_input = latest_version
else:
logger.error("No latest_version found in manifest")
return 4
# Parse version and build_number from version string using regex
# Format: '1.0.1' or '1.0.1-1' or '1.0.a' or 'a.b.c-0' (version can be digits/letters separated by dots)
match = re.match(r'^([0-9a-z]+(?:\.[0-9a-z]+)*)(?:-(\d+))?$', version_input)
if not match:
logger.error(f"Invalid version format: {version_input}")
return 5
version = match.group(1)
build_number = int(match.group(2)) if match.group(2) else None
# If build_number is None, get the largest build number from manifest
if build_number is None:
files = manifest.get('files', [])
version_files = [f for f in files if f.get('version') == version]
if version_files:
build_number = max([f.get('attrs', {}).get('build_number', 0) for f in version_files])
else:
logger.error(f"No files found for version: {version}")
return 6
logger.info(f"Target conda package: {PKG_NAME} version '{version}' (build: {build_number})")
branch_name = f"rel-{version}-{build_number}"
if not repo_is_branch_exists(branch_name):
logger.error(f"Branch {branch_name} exists, aborting")
return 7
package_data = fetch_conda_package(manifest, version, build_number)
# Repack each conda package to tar.gz in parallel
if not package_data:
logger.error("No package data generated")
return 8
logger.info(f"Repacking {len(package_data)} packages in parallel...")
async def repack_all() -> Dict[str, Optional[bytes]]:
tasks = []
target_list = []
for target, data in package_data.items():
if data is not None:
logger.info(f" [REPACK] {PKG_NAME}-{target}")
tasks.append(repack_conda_package_to_tgz(data))
target_list.append(target)
results = await asyncio.gather(*tasks)
return dict(zip(target_list, results))
repacked_data = asyncio.run(repack_all())
# Push to git repository
ret = repo_commit_to_branch(repacked_data, branch_name, f"Prebuilt auto checkin version {version}-{build_number}")
if ret is not True:
logger.error("Git: operation couldn't be completed")
return 9
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment