Skip to content

Instantly share code, notes, and snippets.

@WinterLi1993
Forked from annasa/removeOptDups.py
Created March 15, 2016 06:47
Show Gist options
  • Save WinterLi1993/17f6b1d8f18a44bbc6a7 to your computer and use it in GitHub Desktop.
Save WinterLi1993/17f6b1d8f18a44bbc6a7 to your computer and use it in GitHub Desktop.
Input a sorted (chr and start pos) sam file and output all records except for optical duplicates
#!/usr/local/bin/python
"RemoveOpticalDuplicates.py"
"Author: Anna Salzberg"
import sys
import string
import os
import time
import math
def convertStr(s):
"""Convert string to either int or float."""
try:
ret = int(s)
except ValueError:
try :
ret = float(s);
except ValueError :
ret = -1;
return ret
class Parameter :
def __init__(self, argv) :
self.argv = argv;
self.inFN = "";
self.opticalDuplicatePixelDistance = 100;
def getOpticalDuplicatePixelDistance(self) :
return self.opticalDuplicatePixelDistance
def getInFN(self) :
return self.inFN;
def getOutFN(self) :
return self.outFN;
# Prints usage
def usage(self):
print "Usage: python RemoveOpticalDuplicates.py [-d optical_duplciate_pixel_distance] <input file> "
print "where the input is a sam file sorted by chr position. Default optical_duplciate_pixel_distance: 100"
print "Example: python RemoveOpticalDuplicates.py sampleA.bwa.sorted.sam > sampleA.bwa.rmoptdup.sam"
sys.exit(1)
def checkArgs(self) :
i = 1;
argc = len(sys.argv);
errorMsg = "";
while i < argc and errorMsg == "":
if self.argv[i] == '-d' :
i += 1;
if i < argc and not self.argv[i].startswith('-') :
self.opticalDuplicatePixelDistance = convertStr(self.argv[i]);
if self.opticalDuplicatePixelDistance < 1 :
errorMsg = 'Error: optical duplicate pixel distance must be an integer >= 1'
else :
errorMsg = 'Error: optical duplicate pixel distance expected after -d flag'
elif i == argc-1 :
self.inFN = self.argv[i];
else :
errorMsg = 'Error: unknown flag: ' + self.argv[i];
i += 1;
if errorMsg == "" :
if self.inFN == "" :
errorMsg = 'Error: insufficient arguments given'
if errorMsg <> "" :
print errorMsg;
self.usage();
def removeOpticalDuplicates(param) :
inFN = param.getInFN();
outFile = sys.stdout;
optDist = param.getOpticalDuplicatePixelDistance()
try:
inFile = open (inFN, 'r')
except:
print "Unable to open file " + inFN
sys.exit(-1)
tile_cigarToDupDict = {}
curChr = ""
curStartPos = ""
first = True;
line = inFile.readline();
while (1) :
if not line: break
nextLine = inFile.readline();
if not line.startswith("@") :
tokens = line.split();
chr = tokens[2];
startPos = tokens[3];
mapq = tokens[4]
cigar = tokens[5];
subtokens = tokens[0].split(":");
tile = subtokens[4];
x = subtokens[5]
y = subtokens[6]
tile_cigar = tile + "_" + cigar;
if first :
curChr = chr;
curStartPos = startPos
first = False;
if chr <> "*" and (chr == curChr and startPos == curStartPos) :
dups = tile_cigarToDupDict.get(tile_cigar, []);
dups.append([x, y, mapq, line]);
tile_cigarToDupDict[tile_cigar] = dups;
if chr <> "*" and (chr <> curChr or startPos <> curStartPos or not nextLine) :
for key in tile_cigarToDupDict.keys() :
dups = tile_cigarToDupDict[key];
dups.sort();
if len(dups) == 1 :
nondupline = dups[0][3]
outFile.write(nondupline);
elif len(dups) > 1 :
processed = []
for k in range(0, len(dups)) :
processed.append(False)
for i in range(0, len(dups)) :
if not processed[i] :
xi = convertStr(dups[i][0])
yi = convertStr(dups[i][1])
mapqi = convertStr(dups[i][2])
processed[i] = True;
best = i;
bestMapq = mapqi;
for j in range(i+1, len(dups)) :
if not processed[j] :
xj = convertStr(dups[j][0])
yj = convertStr(dups[j][1])
mapqj = convertStr(dups[j][2])
if abs(xj-xi) > optDist : break;
if abs(yj-yi) <= optDist :
processed[j] = True
if mapqj > bestMapq :
best = j;
bestMapq = mapqj;
bestline = dups[best][3]
outFile.write(bestline);
tile_cigarToDupDict = {}
tile_cigarToDupDict[tile_cigar] = [[x, y, mapq, line]]
if chr <> "*" and not nextLine and (chr <> curChr or startPos <> curStartPos) :
outFile.write(line);
if chr == "*" :
outFile.write(line);
curChr = chr
curStartPos = startPos;
line = nextLine;
inFile.close();
outFile.close();
# Execute as application
if __name__ == '__main__' :
param = Parameter(sys.argv)
param.checkArgs();
removeOpticalDuplicates(param)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment