Skip to content

Instantly share code, notes, and snippets.

@alexcritschristoph
Last active August 8, 2024 02:46
Show Gist options
  • Save alexcritschristoph/7087c7d972192cd7946c27753f05f744 to your computer and use it in GitHub Desktop.
Save alexcritschristoph/7087c7d972192cd7946c27753f05f744 to your computer and use it in GitHub Desktop.
get the consensus sequence from a BAM
## returns the consensus sequence of a bam
## minimum 3x depth of coverage at a site required
import pysam
import sys
import pandas as pd
import argparse
from Bio import SeqIO
import numpy as np
from collections import defaultdict
import pandas as pd
P2C = {'A':0, 'C':1, 'T':2, 'G':3}
C2P = {0:'A', 1:'C', 2:'T', 3:'G'}
def read_bam(bam, fasta, min_ani, min_mapq):
count = 0
sequence = ''
for record in SeqIO.parse(fasta, 'fasta'):
sequence = record.seq
print(sequence[0])
print(len(sequence))
new_sequence = defaultdict(lambda:'N')
samfile = pysam.AlignmentFile(bam)
for pileupcolumn in samfile.pileup(record.id, truncate = True, max_depth=100000,
stepper = 'nofilter', compute_baq= True,
ignore_orphans = True, ignore_overlaps = True,
min_base_quality = 30):
table = np.zeros(4, dtype=int)
for pileupread in pileupcolumn.pileups:
if not pileupread.is_del and not pileupread.is_refskip:
table[P2C[pileupread.alignment.query_sequence[pileupread.query_position]]] += 1
if np.max(table) >= 1 and C2P[np.argmax(table)] == sequence[pileupcolumn.pos]:
new_sequence[pileupcolumn.pos] = C2P[np.argmax(table)]
elif np.max(table) >= 3 and C2P[np.argmax(table)] != sequence[pileupcolumn.pos]:
print(str(pileupcolumn.pos) + "\t" + str(table))
if pd.Series(table).astype(bool).sum() == 1:
new_sequence[pileupcolumn.pos] = C2P[np.argmax(table)]
final_sequence = ''
i = 0
for pos in sequence:
final_sequence += new_sequence[i]
i += 1
f = open(bam.split("/")[-1].split(".bam")[0] + ".fasta", 'w+')
f.write(">" + bam.split("/")[-1].split(".bam")[0] + "\n")
f.write(final_sequence + "\n")
f.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Calculate filtered coverages for scaffolds",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# Required positional arguments
parser.add_argument("bam", help="Sorted, indexed .bam file.")
parser.add_argument("fasta", help="FASTA file.")
parser.add_argument("-c", "--min_ani", action="store", default=0.95, type=float, \
help='Minimum required percent identity of read pairs to consensus. (>=)')
parser.add_argument("--min_mapq", action="store", default=2, type=int,\
help='Minimum mapq score of EITHER read in a pair to use that pair. (>=) ')
args = parser.parse_args()
run = read_bam(args.bam, args.fasta, args.min_ani, args.min_mapq)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment