Created
February 4, 2025 11:02
-
-
Save MartGro/1ae09da1cfdab96dae21f4d1db0cceaa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
import logging | |
import json | |
from typing import Dict, List, Optional | |
from datetime import datetime | |
class EnsemblCompara: | |
def __init__(self, debug: bool = True): | |
self.base_url = "https://rest.ensembl.org" | |
self.headers = { | |
"Content-Type": "application/json", | |
"User-Agent": "Python-MSA-Client/1.0" | |
} | |
# Set up logging | |
self.logger = logging.getLogger('EnsemblCompara') | |
self.logger.setLevel(logging.DEBUG if debug else logging.INFO) | |
# Create console handler with formatting | |
ch = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
ch.setFormatter(formatter) | |
self.logger.addHandler(ch) | |
def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> List[Dict]: | |
"""Make request to Ensembl REST API with rate limiting and debug info""" | |
url = f"{self.base_url}{endpoint}" | |
self.logger.debug(f"Making request to: {url}") | |
self.logger.debug(f"With parameters: {json.dumps(params, indent=2) if params else 'None'}") | |
start_time = datetime.now() | |
response = requests.get( | |
url, | |
headers=self.headers, | |
params=params | |
) | |
request_time = (datetime.now() - start_time).total_seconds() | |
self.logger.debug(f"Request completed in {request_time:.2f} seconds") | |
self.logger.debug(f"Response status code: {response.status_code}") | |
# Handle rate limiting | |
if response.status_code == 429: | |
retry_after = int(response.headers.get('Retry-After', 60)) | |
self.logger.warning(f"Rate limit hit. Waiting {retry_after} seconds before retry") | |
time.sleep(retry_after) | |
return self._make_request(endpoint, params) | |
try: | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
self.logger.error(f"Request failed: {str(e)}") | |
self.logger.error(f"Response content: {response.content.decode()}") | |
raise | |
json_response = response.json() | |
self.logger.debug(f"Response size: {len(response.content)} bytes") | |
return json_response | |
def get_epo_extended_alignment(self, region: str, species: str = "human") -> List[Dict]: | |
""" | |
Get EPO_EXTENDED multiple sequence alignment for mammals for a specific region. | |
Args: | |
region (str): Region in format 'chromosome:start-end' | |
e.g., '1:1000000-1001000' | |
species (str): Reference species (default: "human") | |
Returns: | |
List[Dict]: List containing alignment block with sequences and tree | |
""" | |
self.logger.info(f"Fetching EPO_EXTENDED alignment for {species} region {region}") | |
endpoint = f"/alignment/region/{species}/{region}" | |
params = { | |
"method": "EPO_EXTENDED", | |
"species_set_group": "mammals", | |
"content-type": "application/json" | |
} | |
return self._make_request(endpoint, params) | |
def summarize_sequences(alignments: List[Dict]) -> None: | |
"""Print summary of sequence alignments""" | |
species_data = {} | |
for seq in alignments: | |
species = seq['species'] | |
length = len(seq['seq']) | |
gaps = seq['seq'].count('-') | |
gap_percent = (gaps / length) * 100 if length > 0 else 0 | |
species_data[species] = { | |
'length': length, | |
'gaps': gaps, | |
'gap_percent': gap_percent, | |
'coordinates': f"{seq['seq_region']}:{seq['start']}-{seq['end']} ({'+' if seq['strand'] == 1 else '-'})" | |
} | |
print("\nAlignment Summary:") | |
print(f"Total species: {len(species_data)}") | |
print("\nDetails by species:") | |
for species, data in species_data.items(): | |
print(f"\n{species}:") | |
print(f" Location: {data['coordinates']}") | |
print(f" Sequence length: {data['length']} bp") | |
print(f" Gaps: {data['gaps']} ({data['gap_percent']:.1f}%)") | |
def print_alignment(alignment_block: Dict, logger: Optional[logging.Logger] = None) -> None: | |
"""Print alignment data in a readable format""" | |
if logger: | |
logger.debug("Processing alignment block") | |
# Print tree if available | |
if 'tree' in alignment_block: | |
print("\nPhylogenetic Tree:") | |
print(alignment_block['tree']) | |
# Print alignment information | |
if 'alignments' in alignment_block: | |
alignments = alignment_block['alignments'] | |
if logger: | |
logger.debug(f"Found {len(alignments)} sequences in alignment") | |
summarize_sequences(alignments) | |
print("\nSequence Preview:") | |
for seq in alignments: | |
species = seq['species'] | |
sequence = seq['seq'] | |
print(f"\n{species}:") | |
print(f"{sequence[:60]}...") # Show first 60 bases | |
def main(): | |
compara = EnsemblCompara(debug=True) | |
try: | |
region = "1:1000000-1001000" | |
compara.logger.info(f"Starting EPO_EXTENDED alignment fetch for region {region}") | |
# Get alignment data (returns list with single block) | |
alignment_blocks = compara.get_epo_extended_alignment(region) | |
if alignment_blocks and len(alignment_blocks) > 0: | |
# Process the first (and usually only) alignment block | |
alignment_block = alignment_blocks[0] | |
print_alignment(alignment_block, compara.logger) | |
else: | |
compara.logger.warning("No alignment blocks returned") | |
compara.logger.info("Alignment processing completed successfully") | |
except requests.exceptions.RequestException as e: | |
compara.logger.error(f"Error accessing Ensembl REST API: {e}") | |
except Exception as e: | |
compara.logger.error(f"Unexpected error: {e}", exc_info=True) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment