Created
February 20, 2016 17:32
-
-
Save parashardhapola/538589fd8b6a01eb2385 to your computer and use it in GitHub Desktop.
a small piece of code to easily hack into GTF files. Some features not optimized for large genomes so may hog on memory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from itertools import groupby | |
def read_fasta(fasta_name): | |
fh = open(fasta_name) | |
faiter = (f[1] for f in groupby(fh, lambda line: line[0] == ">")) | |
for head in faiter: | |
head = head.next()[1:].strip() | |
s = "".join(s.strip() for s in faiter.next()) | |
yield head, s | |
class GenericFeature(object): | |
def __init__(self, name, uid, chrom, start, end, strand, parent, feat_type, misc): | |
self.chrom = chrom | |
self.start = start | |
self.end = end | |
self.strand = strand | |
self.parent = parent | |
self.children = [] | |
self.type = feat_type | |
self.name = name | |
self.id = uid | |
self.info = misc | |
def set_child(self, child_id): | |
self.children.append(child_id) | |
class GTF(object): | |
def __init__(self, gtf_file): | |
self.file = gtf_file | |
self.genes = {} | |
self.transcripts = {} | |
self.exons = {} | |
self.summary = {} | |
self.file_parser() | |
self.make_chilren() | |
def lazy_reader(self): | |
with open(self.file) as h: | |
for l in h: | |
if l[0] != "#" and len(l) > 20: | |
c = l.rstrip('\n').split('\t') | |
yield c | |
def file_parser(self): | |
gtf_stream = self.lazy_reader() | |
for row in gtf_stream: | |
anno = {x.split(' ')[0]: x.split(' ')[1].rstrip(';').strip('"') for x in row[-1].split('; ')} | |
if row[2] == "gene": | |
if 'gene_name' not in anno: | |
anno['gene_name'] = None | |
feature = GenericFeature(anno['gene_name'], anno['gene_id'], row[0], row[3], row[4], | |
row[6], None, 'gene', {'gene_biotype': anno['gene_biotype']}) | |
self.genes[anno['gene_id']] = feature | |
elif row[2] == 'transcript': | |
if 'transcript_name' not in anno: | |
anno['transcript_name'] = None | |
feature = GenericFeature(anno['transcript_name'], anno['transcript_id'], row[0], | |
row[3], row[4], row[6], anno['gene_id'], 'transcript', | |
{'transcript_biotype': anno['transcript_biotype']}) | |
self.transcripts[anno['transcript_id']] = feature | |
elif row[2] == 'exon': | |
feature = GenericFeature(None, anno['exon_id'], row[0], | |
row[3], row[4], row[6], anno['transcript_id'], 'exon', | |
{'exon_number': anno['exon_number']}) | |
self.exons[anno['exon_id']] = feature | |
def summarize(self): | |
if self.summary == {}: | |
self.summary = { | |
'genes': len(self.genes), | |
'transcripts': len(self.transcripts), | |
'exons': len(self.exons) | |
} | |
print self.summary | |
def make_chilren(self): | |
for exon in self.exons.values(): | |
self.transcripts[exon.parent].children.append(exon.id) | |
for transcript in self.transcripts.values(): | |
self.genes[transcript.parent].children.append(transcript.id) | |
def make_transcriptome_json(self): | |
chrom_wise = {} | |
for gene in self.genes.values(): | |
for transcript_id in gene.children: | |
transcript = self.transcripts[transcript_id] | |
coords = [] | |
for exon_id in transcript.children: | |
exon = self.exons[exon_id] | |
coords.append([int(exon.start), int(exon.end)]) | |
if transcript.chrom not in chrom_wise: | |
chrom_wise[transcript.chrom] = [] | |
chrom_wise[transcript.chrom].append({ | |
'transcript_id': transcript.id, | |
'transcript.name': transcript.name, | |
'gene.id': gene.id, | |
'gene.name': gene.name, | |
'seq_coords': coords, | |
'strand': transcript.strand | |
}) | |
return chrom_wise | |
def make_genome_dict(self, fasta_file): | |
genome_dict = {} | |
for h,s in read_fasta(fasta_file): | |
genome_dict[h] = s | |
return genome_dict | |
def make_transcriptome_fasta(self, genome_file, out_fasta): | |
OUT = open(out_fasta, 'w') | |
genome_dict = self.make_genome_dict(genome_file) | |
chrom_wise_info = self.make_transcriptome_json() | |
for chrom in chrom_wise_info: | |
seq = genome_dict[chrom] | |
for t in chrom_wise_info[chrom]: | |
t_seq = [] | |
for coord in t['seq_coords']: | |
t_seq.append(seq[coord[0]:coord[1]]) | |
OUT.write(">%s\n%s\n" % (t['transcript_id'], "".join(t_seq))) | |
OUT.close() | |
return True | |
if __name__ == "__main__": | |
gtf_file = sys.argv[1] | |
gtf = GTF(gtf_file) | |
gtf.summarize() | |
gtf.make_transcriptome_fasta('./resource/genome.fa', './resource/transcripts.fasta') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment