Created
October 13, 2017 13:25
-
-
Save ilevantis/0cabf5afe771450305811e8bb26f3363 to your computer and use it in GitHub Desktop.
parse the txt output of hmmsearch (hmmer3)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import re | |
from ast import literal_eval | |
import numpy as np | |
def parse_val(s): | |
try: | |
return literal_eval(s) | |
except: | |
if s in ['nan', '-nan']: | |
return np.nan | |
elif s in ['inf']: | |
return np.inf | |
else: | |
return s | |
_SCORE_COLS = ['hit_eval', 'hit_score', 'hit_bias', 'hit_name', 'hit_desc' ] | |
def parse_score_line(line): | |
split = line.split() | |
col_vals = split[0:3] + split[8:9] + [' '.join(split[9:])] | |
return { i[0]:parse_val(i[1]) for i in zip(_SCORE_COLS, col_vals) } | |
_DOMAIN_COLS = ['hsp_num', 'hsp_score', 'hsp_bias', 'hsp_c_eval', 'hsp_i_eval', 'hsp_q_start', 'hsp_q_end', 'hsp_h_start', 'hsp_h_end', 'hsp_e_start', 'hsp_e_end', 'hsp_acc'] | |
def parse_domain_line(line): | |
split = line.split() | |
col_vals = split[0:1] + split[2:8] + split[9:11] + split[12:14] + split[15:1] | |
return { i[0]:parse_val(i[1]) for i in zip(_DOMAIN_COLS, col_vals) } | |
_RECORD_SPLIT_STRING = 'Internal pipeline statistics summary:\n-------------------------------------\n' | |
_QUERY_NAME_PATTERN = re.compile(r'Query:(.*)\[M=\d+\]\n') | |
_SCORES_TABLE_PATTERN = re.compile(r'Scores for complete sequences \(score includes all domains\):.*-----------\n(.*)\nDomain annotation for each sequence \(and alignments\):', | |
re.DOTALL ) | |
_HSP_PARENT_PATTERN = re.compile(r'^([^\s]*)\s+') | |
_HSP_NUM_PATTERN = re.compile(r'domain (\d+)\s') | |
_ALIGN_PATTERN = re.compile(r' .*\s+\d+\s+([a-zA-Z\.]+)') | |
def parse_record(record): | |
query = _QUERY_NAME_PATTERN.search(record[0]).group(1).strip() | |
if ' [No hits detected that satisfy reporting thresholds]\n' in record: | |
vals = [] | |
else: | |
hits = {} | |
record = ''.join(record) | |
results, stats = record.split(_RECORD_SPLIT_STRING) | |
scores_table = _SCORES_TABLE_PATTERN.search(results).group(1).splitlines() | |
for line in scores_table: | |
if line != ' ------ inclusion threshold ------': | |
hit = parse_score_line(line) | |
hits[hit['hit_name']] = hit | |
hits_hsps = results.split('>> ')[1:] | |
for hsp in hits_hsps: | |
hsp_dict = {} | |
hsp_parent = _HSP_PARENT_PATTERN.search(hsp).group(1) | |
hsp_data_parts = hsp.split('Alignments for each domain:') | |
domain_table = ( s for s in hsp_data_parts[0].splitlines()[3:] if s.strip() ) | |
for line in domain_table: | |
hsp_info = parse_domain_line(line) | |
hsp_dict[hsp_info['hsp_num']] = hsp_info | |
alignments = ( s.splitlines() for s in hsp_data_parts[1].split('==')[1:] ) | |
for alignment in alignments: | |
hsp_num = literal_eval(_HSP_NUM_PATTERN.search(alignment[0]).group(1)) | |
align_span = _ALIGN_PATTERN.search(alignment[1]).span(1) | |
hsp_dict[hsp_num]['alignstr_q'] = alignment[1][slice(*align_span)] | |
hsp_dict[hsp_num]['alignstr_m'] = alignment[2][slice(*align_span)] | |
hsp_dict[hsp_num]['alignstr_h'] = alignment[3][slice(*align_span)] | |
hsp_dict[hsp_num]['alignstr_a'] = alignment[4][slice(*align_span)] | |
hits[hsp_parent]['hsps'] = [v for v in hsp_dict.values()] | |
vals = [v for v in hits.values()] | |
return (query, vals) | |
def parse_hmmout(handle): | |
record = [] | |
for line in handle: | |
if line[0] in ['#', '\n']: | |
pass | |
elif line != '//\n': | |
record.append(line) | |
else: | |
yield parse_record(record) | |
record = [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment