Last active
April 1, 2025 09:14
-
-
Save danilotat/71fabb88592e35a32512b23a012e5489 to your computer and use it in GitHub Desktop.
This script takes a VCF as input and returns a minimal MAF compatible for downstream package like MAFtools. It requires a VCF file already annotate with VEP and a python environment with cyvcf2. **USAGE**: `python3 vcf2maf.py -i input.vcf -o output.maf` .
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
import pandas as pd | |
import cyvcf2 | |
import re | |
import argparse | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Convert VCF to MAF format") | |
parser.add_argument("-i", "--input", type=str, required=True, help="Input VCF file") | |
parser.add_argument("-o", "--output", type=str, required=True, help="Output MAF file") | |
return parser.parse_args() | |
class CSQ_parser: | |
def __init__(self, vcf: cyvcf2.VCF): | |
self.csq_header = vcf.get_header_type("CSQ") | |
self.fields = self._parse_csq_header() | |
def _parse_csq_header(self): | |
fields = self.csq_header["Description"].split("Format: ")[1].replace('"', '').strip().split("|") | |
return fields | |
def parse_csq_field(self, csq: str): | |
parsed_dict = dict(zip(self.fields, csq.split("|"))) | |
return parsed_dict | |
def __repr__(self): | |
return f"CSQ_field:\n{self.csq_header}" | |
class VcfEntry: | |
def __init__(self, sample: str, csq_parser: CSQ_parser, variant: cyvcf2.Variant): | |
self.variant = variant | |
self.columns=[ | |
"Hugo_Symbol","Chromosome","Start_Position","End_Position","Reference_Allele", | |
"Tumor_Seq_Allele2","Variant_Classification","Variant_Type","Tumor_Sample_Barcode" | |
] | |
self.csq_field = csq_parser.parse_csq_field(variant.INFO.get("CSQ")) | |
self.var_type = self._get_variant_type() | |
self.Hugo_Symbol = self.csq_field.get("SYMBOL") | |
self.Chromosome = variant.CHROM | |
self.Start_Position = variant.POS - 1 | |
self.End_Position = variant.POS | |
self.Reference_Allele = variant.REF | |
self.Tumor_Seq_Allele2 = variant.ALT[0] | |
self.Variant_Type = self.csq_field.get("Consequence").split("&")[0].strip() | |
self.inframe = False if "frameshift" in self.csq_field.get("Consequence") else True | |
self.Variant_Classification = self._get_variant_classification( | |
self.Variant_Type, | |
self.var_type, | |
self.inframe | |
) | |
self.Tumor_Sample_Barcode = sample | |
def _get_variant_type(self): | |
if self.variant.is_snp: | |
return "SNP" | |
elif self.variant.is_indel: | |
return "INS" if len(self.variant.REF) < len(self.variant.ALT[0]) else "DEL" | |
@staticmethod | |
def _get_variant_classification(effect, var_type, inframe): | |
""" | |
Convert input variant details into a classification string using match-case (Python 3.10+). | |
:param effect: string representing the variant effect. | |
:param var_type: string representing the variant type (e.g., 'DEL', 'INS'). | |
:param inframe: boolean indicating in-frame status. | |
:return: classification string. | |
""" | |
if not effect: | |
return "Targeted_Region" # In case VEP was skipped | |
match (effect, var_type, inframe): | |
case (ef, _, _) if re.fullmatch(r"(splice_acceptor_variant|splice_donor_variant|transcript_ablation|exon_loss_variant)", ef): | |
return "Splice_Site" | |
case ("stop_gained", _, _): | |
return "Nonsense_Mutation" | |
case (ef, "DEL", inf) if (ef == "frameshift_variant" or (ef == "protein_altering_variant" and not inf)): | |
return "Frame_Shift_Del" | |
case (ef, "INS", inf) if (ef == "frameshift_variant" or (ef == "protein_altering_variant" and not inf)): | |
return "Frame_Shift_Ins" | |
case ("stop_lost", _, _): | |
return "Nonstop_Mutation" | |
case (ef, _, _) if re.fullmatch(r"(initiator_codon_variant|start_lost)", ef): | |
return "Translation_Start_Site" | |
case (ef, vt, inf) if ef.endswith("inframe_insertion") or (ef == "protein_altering_variant" and inf and vt == "INS"): | |
return "In_Frame_Ins" | |
case (ef, vt, inf) if ef.endswith("inframe_deletion") or (ef == "protein_altering_variant" and inf and vt == "DEL"): | |
return "In_Frame_Del" | |
case (ef, _, _) if re.fullmatch(r"(missense_variant|coding_sequence_variant|conservative_missense_variant|rare_amino_acid_variant)", ef): | |
return "Missense_Mutation" | |
case (ef, _, _) if re.fullmatch(r"(transcript_amplification|intron_variant|INTRAGENIC|intragenic_variant)", ef): | |
return "Intron" | |
case ("splice_region_variant", _, _): | |
return "Splice_Region" | |
case (ef, _, _) if re.fullmatch(r"(incomplete_terminal_codon_variant|synonymous_variant|stop_retained_variant|NMD_transcript_variant)", ef): | |
return "Silent" | |
case (ef, _, _) if re.fullmatch(r"(mature_miRNA_variant|exon_variant|non_coding_exon_variant|non_coding_transcript_exon_variant|non_coding_transcript_variant|nc_transcript_variant)", ef): | |
return "RNA" | |
case (ef, _, _) if re.fullmatch(r"(5_prime_UTR_variant|5_prime_UTR_premature_start_codon_gain_variant)", ef): | |
return "5'UTR" | |
case ("3_prime_UTR_variant", _, _): | |
return "3'UTR" | |
case (ef, _, _) if re.fullmatch(r"(TF_binding_site_variant|regulatory_region_variant|regulatory_region|intergenic_variant|intergenic_region)", ef): | |
return "IGR" | |
case ("upstream_gene_variant", _, _): | |
return "5'Flank" | |
case ("downstream_gene_variant", _, _): | |
return "3'Flank" | |
case _: | |
return "Unknown" | |
def return_as_series(self): | |
# return as series using defined columns | |
return pd.Series([getattr(self, field) for field in self.columns], index | |
=self.columns) | |
def __repr__(self): | |
fields = "\n".join([f"{field}: {getattr(self, field)}" for field in self.columns]) | |
return f"VcfEntry:\n{fields}" | |
if __name__ == '__main__': | |
args = parse_args() | |
vcf = cyvcf2.VCF(args.input) | |
sample_id = vcf.samples[0] | |
csq = CSQ_parser(vcf) | |
frame = pd.DataFrame(columns=[ | |
"Hugo_Symbol","Chromosome","Start_Position","End_Position","Reference_Allele", | |
"Tumor_Seq_Allele2","Variant_Classification","Variant_Type","Tumor_Sample_Barcode" | |
]) | |
for variant in vcf: | |
if variant.INFO.get("CSQ") is None: | |
continue | |
else: | |
entry = VcfEntry(sample_id, csq, variant) | |
frame = pd.concat([frame,pd.DataFrame(entry.return_as_series()).T], ignore_index=True) | |
frame.to_csv(args.output, sep="\t", index=False) | |
vcf.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment