Created
January 4, 2018 06:36
-
-
Save hliang/f0029acb31a8e8632c0f04545faaf75b to your computer and use it in GitHub Desktop.
modified ucsc_snapshots https://github.com/jayhesselberth/ucsc_snapshots
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
''' | |
================== | |
ucsc_snapshots | |
================== | |
retrieve pictures from the UCSC Genome Browser based on coordinates | |
specified from BED3+ file and a session ID (hgsid). | |
UCSC Genome Browser should be set up with tracks prior to using the | |
utility and saved as a session. This includes all track settings, sizes | |
etc. Once these are setup, load the page source and identify the hgsid by | |
searching for "hgsid=" | |
Supply the hgsid and a BED file. Images will be retrieved based on the | |
coordinates in the BED file and saved to the directory as:: | |
ucsc-snapshots-<hgsid>/chrom-start-end.pdf | |
ucsc-snapshots-<hgsid>/chrom-start-end.png | |
.. note:: | |
beware of multiple procs accessessing the same hgsid at | |
the same time, they will affect each other's strand settings | |
.. warning:: | |
if you have many regions, you will need to run this overnight (e.g. | |
via a job submitted to the LSF night queue), otherwise UCSC will | |
throttle your connection (limit 1 request / 15 sec, 5000 requests per | |
day) | |
''' | |
import sys | |
import re | |
import time | |
import ucscsession | |
from pybedtools import BedTool | |
from cyvcf2 import VCF | |
import os | |
from path import Path as path | |
__author__ = 'Jay Hesselberth' | |
__contact__ = '[email protected]' | |
__version__ = '0.1.8' | |
def ucsc_snapshots(bedfilename, session_id, rev_display, dir_annots, | |
upstream, downstream, img_types, no_delay, verbose): | |
ucscsession.settings.hgsid = session_id | |
session = Session(rev_display, no_delay, verbose) | |
# for each BED field, set params on the UCSC browser and fetch the | |
# images | |
for region in BedTool(bedfilename): | |
if region.file_type == "vcf": | |
position = '%s:%s-%s' % (region.chrom, region.start - upstream - 1, region.start + downstream) | |
highlightregions = [[session.cart['db'], region.chrom, region.start, region.start, "#aaedff"]] | |
# TODO highlight indel region instead of just the start site | |
elif region.file_type == "bed": | |
position = '%s:%s-%s' % (region.chrom, region.start, region.end) | |
else: | |
raise OSError, ">> target region type %s not supported. BED3+ or VCF are supported" % region.file_type | |
if verbose: | |
print >>sys.stderr, ">> set position %s" % position | |
session.set_position(position) | |
#print "session.cart=", session.cart | |
session.set_highlight(highlightregions) | |
for imgtype in img_types: | |
filename = getfilename(session_id, | |
position=position if region.file_type == "bed" else '%s:%s' % (region.chrom, region.start), | |
name=region.name if False else ".", | |
score=region.score if region.file_type == "bed" else ".", | |
annots=dir_annots, ext=imgtype) | |
session.write_image(imgtype=imgtype, filename=filename, | |
strand=region.strand) | |
def ucsc_snapshots_vcf(vcffilename, session_id, rev_display, dir_annots, | |
upstream, downstream, img_types, no_delay, verbose): | |
ucscsession.settings.hgsid = session_id | |
session = Session(rev_display, no_delay, verbose) | |
# for each entry in vcf, set params on the UCSC browser and fetch the | |
# images | |
if not os.path.isfile(vcffilename): | |
print >>sys.stderr, ">> WARNING vcf file %s not found" % vcffilename | |
return | |
for variant in VCF(vcffilename): | |
position = '%s:%s-%s' % (variant.CHROM, variant.start - upstream, variant.start + downstream) | |
highlightregions = [[session.cart['db'], variant.CHROM, variant.start + 1, variant.end, "#aaedff"]] # highlight indel region instead of just the start site | |
if verbose: | |
print >>sys.stderr, ">> set position %s" % position | |
session.set_position(position) | |
#print "session.cart=", session.cart | |
session.set_highlight(highlightregions) | |
print "INFO updated session.tracks_url=", session.tracks_url | |
print "INFO session.cart position=", session.cart["position"] | |
print "INFO session.cart highligh=", session.cart["highlight"] | |
for imgtype in img_types: | |
filename = getfilename(session_id, | |
position='%s:%s' % (variant.CHROM, variant.start + 1), | |
name=".", | |
score=".", | |
annots=dir_annots, ext=imgtype) | |
session.write_image(imgtype=imgtype, filename=filename, | |
strand="+") | |
class Session(ucscsession.session._UCSCSession): | |
def __init__(self, rev_display, no_delay, verbose): | |
ucscsession.session._UCSCSession.__init__(self) | |
#ucscsession.session.change_mirror("http://genome-asia.ucsc.edu") | |
self.verbose = verbose | |
# keep track of the flipped state in the session. if flipped is | |
# True, will show neg strand genes 5'->3' | |
self.rev_display = rev_display | |
flipped = self._find_flipped_state() | |
self.flipped = flipped | |
# XXX no_delay for debugging only | |
if no_delay: | |
self._SLEEP = 1 | |
else: | |
# required minimum time between requests per UCSC | |
#self._SLEEP = 15 | |
self._SLEEP = 3 | |
def write_image(self, imgtype, filename, strand): | |
''' | |
wrapper for image generation. writes requested content to | |
filename. | |
only PDF and PNG are currently supported. | |
Args: | |
imgtype (str): pdf or png | |
filename (str): name of file to write | |
strand (str): optional strand argument | |
Returns: | |
Nothing | |
''' | |
# XXX could add more image capability by converting retrieved png | |
# files to jpeg, tif etc. | |
if imgtype.lower() == 'pdf': | |
content = self.pdf(strand) | |
elif imgtype.lower() == 'png': | |
content = self.png(strand) | |
else: | |
raise OSError, ">> image type %s not supported" % imgtype | |
with open(filename, 'w') as fileout: | |
fileout.write(content) | |
if self.verbose: | |
print >>sys.stderr, ">> wrote %s" % filename | |
def pdf(self, strand): | |
""" | |
retrieve PDF of the current browser view for currently set | |
position. | |
Args: | |
strand (str) | |
Returns: | |
pdf content (str) | |
""" | |
payload = {'hgt.psOutput':'on'} | |
# deal with display flipping | |
payload = self._flip_display(strand, payload) | |
response = self.session.post(self.tracks_url, data=payload) | |
time.sleep(self._SLEEP) | |
link = ucscsession.helpers.pdf_link(response) | |
content = self.session.get(link).content | |
return content | |
def png(self, strand): | |
""" | |
retrieve merged PNG of the current browser view for currently set | |
position. | |
Args: | |
strand (str) | |
Returns: | |
png content (str) | |
""" | |
# these CGI settings ensure the browser returns a single, merged | |
# PNG for the viewport | |
payload = {'hgt.imageV1':1, | |
'hgt.trackImgOnly':1} | |
# deal with display flipping | |
payload = self._flip_display(strand, payload) | |
# example: <IMG SRC='../trash/hgt/hgt_genome_6243_68cdb0.png' | |
#img_regex = re.compile('<IMG SRC = \"\.\.\/(trash\/hgt\/hgt_.*\.png)') | |
img_regex = re.compile('<IMG SRC\s*=\s*.*\.\.\/(trash.*png)') | |
response = self.session.post(self.tracks_url, data=payload) | |
#print type(self.session) | |
#print img_regex.findall(response.text)[0] | |
imgfilename = img_regex.findall(response.text)[0] | |
#sys.exit(0) | |
time.sleep(self._SLEEP) | |
link = self._mirror + "/" + imgfilename | |
content = self.session.get(link).content | |
return content | |
def _flip_display(self, strand, payload): | |
''' | |
flip display for such that all genes (pos and neg strands) are | |
shown 5'->3' | |
adds hgt.toggleRevCmplDisp to CGI payload if required | |
beware of multiple procs accessessing the same hgsid at the | |
same time, they will affect each other's strand settings | |
Args: | |
strand (str) | |
payload (dict): updated payload with toggle key | |
Returns: | |
payload (dict) | |
''' | |
# the CGI var is a true toggle so must be flipped back to e.g. | |
# pos strand genes after a neg strand gene is retrieved and vice | |
# versa | |
if strand == '-' and self.rev_display and not self.flipped: | |
payload['hgt.toggleRevCmplDisp'] = '1' | |
self.flipped = True | |
elif strand == '+' and self.rev_display and self.flipped: | |
payload['hgt.toggleRevCmplDisp'] = '1' | |
self.flipped = False | |
return payload | |
def _find_flipped_state(self): | |
''' | |
Determine whether the display is currently flipped. | |
Args: | |
None | |
Returns: | |
bool: Whether state is flipped | |
''' | |
cart_info = self.cart_info() | |
db = cart_info['db'] | |
comp_key = 'hgt.revCmplDisp_%s' % db | |
try: | |
state = cart_info[comp_key] | |
except KeyError: | |
msg = ">> forcing display to + strand ... " | |
self._flip_display(strand = '+', payload = {}) | |
state = 0 | |
# possible vals are 0 and 1 | |
if int(state) == 0: | |
return False | |
else: | |
return True | |
def getfilename(session_id, position, name, score, annots, ext='pdf'): | |
''' | |
Generate a filename for the image output. creates directory if it does | |
not exist. | |
Args: | |
position (str): e.g. chr1:1-100) | |
name (str): gene name | |
score (str) score | |
annots (str): annots | |
Returns: | |
filename (str): <name>-<score>-<pos>.pdf | |
''' | |
#imgdir = 'ucsc-snapshots-hgsid-%s' % session_id | |
imgdir = 'ucsc-snapshots' | |
if annots: | |
for key, val in annots: | |
imgdir += '-%s-%s' % (key, val) | |
imgdir = path(imgdir) | |
if not imgdir.exists(): | |
imgdir.mkdir() | |
# chr:1-2 -> chr-1-2 | |
pos = position.replace(':','-') | |
fname = pos | |
if score and score != '.': | |
fname = '%s-%s' % (score, fname) | |
if name and name != '.': | |
fname = '%s-%s' % (name, fname) | |
imgfilename = path('%s.%s' % (fname, ext.lower())) | |
imgfilepath = imgdir.joinpath(imgfilename) | |
return imgfilepath | |
def parse_options(args): | |
from optparse import OptionParser | |
#usage = "%prog [OPTION]... BED3+ UCSC-hgsid" | |
usage = "%prog <--sessionid XXXX> <--bedfile YYYY.bed | --vcffile ZZZZ.vcf> [OPTION]..." | |
version = "%%prog %s" % __version__ | |
description = ("retrieve images from UCSC based on BED regions and " | |
"UCSC session ID ('hgsid')") | |
parser = OptionParser(usage=usage, version=version, | |
description=description) | |
parser.add_option("--sessionid", action="store", dest="sessionid", | |
default=None, help="session id, UCSC-hgsid") | |
parser.add_option("--bedfile", action="store", dest="bedfile", | |
default=None, help="BED3+ file.") | |
parser.add_option("--vcffile", action="store", dest="vcffile", | |
default=None, help="vcf file.") | |
parser.add_option("--upstream", action="store_const", dest="upstream", | |
default=25, help="upstream bp to show. " | |
"(default: %default)") | |
parser.add_option("--downstream", action="store_const", dest="downstream", | |
default=25, help="downstream bp to show. " | |
"(default: %default)") | |
parser.add_option("--img-types", action="append", | |
default=[], help="image types to write. " | |
"(default: PNG, PDF)") | |
parser.add_option("--dir-annotations", action="append", | |
default=[], help="additional key=value pairs for annotating " | |
"output directory. (default: None)") | |
parser.add_option("--no-delay", action="store_true", | |
default=False, help="no delay between requests. for debugging " | |
"only. " | |
"(default: %default)") | |
parser.add_option("--reverse-display", action="store_true", | |
default=False, help="reverse display for neg strand genes " | |
"(default: %default)") | |
parser.add_option("-v", "--verbose", action="store_true", | |
default=False, help="verbose output (default: %default)") | |
options, args = parser.parse_args(args) | |
# if len(args) < 2: | |
# parser.error("specify BED filename and UCSC session ID") | |
if options.sessionid is None: | |
parser.error("specify one UCSC session ID") | |
if options.bedfile is None and options.vcffile is None: | |
parser.error("specify one BED filename or one VCF filename") | |
if len(options.img_types) == 0: | |
options.img_types.extend(['PDF','PNG']) | |
return options, args | |
def main(args=sys.argv[1:]): | |
options, args = parse_options(args) | |
# bedfilename = args[0] | |
# sessionid = args[1] | |
bedfilename = options.bedfile | |
vcffilename = options.vcffile | |
sessionid = options.sessionid | |
if options.dir_annotations: | |
annots = [(i.split('=')) for i in options.dir_annotations] | |
else: | |
annots = [] | |
kwargs = {'verbose':options.verbose, | |
'upstream':options.upstream, | |
'downstream':options.downstream, | |
'img_types':options.img_types, | |
'rev_display':options.reverse_display, | |
'dir_annots':annots, | |
'no_delay':options.no_delay} | |
if bedfilename is not None: | |
return ucsc_snapshots(bedfilename, sessionid, **kwargs) | |
else: | |
return ucsc_snapshots_vcf(vcffilename, sessionid, **kwargs) | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment