Created
July 9, 2015 23:17
-
-
Save radaniba/6f7ce2acc45c2b042d2e to your computer and use it in GitHub Desktop.
This small code goes over a lot of bismark results and create a summary tables based on features you want to extract
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a small python script to generate a report based on multiple analysis from bismark | |
It creates a single doncument (report) as a table | |
Rows are data parsed | |
Columns will be samples analyzed | |
The script takes as argument : | |
- a path where we have several bismark individual results | |
- an output filename | |
""" | |
from __future__ import division | |
__author__ = "Rad <[email protected]>" | |
__license__ = "GNU General Public License version 3" | |
__date__ = "06/30/2015" | |
__version__ = "0.1" | |
try: | |
import os | |
import pandas as pd | |
import datetime | |
import logging as log | |
import subprocess | |
from prettytable import PrettyTable | |
from argparse import ArgumentParser | |
except ImportError: | |
# Checks the installation of the necessary python modules | |
import os | |
import sys | |
print((os.linesep * 2).join( | |
["An error found importing one module:", str(sys.exc_info()[1]), "You need to install it Stopping..."])) | |
sys.exit(-2) | |
def init_log(): | |
current_time = datetime.datetime.now() | |
logger = log.getLogger(__name__) | |
logger.setLevel(log.INFO) | |
handler = log.FileHandler('report_generator.log') | |
handler.setLevel(log.INFO) | |
# create a logging format | |
formatter = log.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
return logger | |
def arguments(): | |
"""Defines the command line arguments for the script.""" | |
main_desc = """Generates a report from different bismark outputs """ | |
parser = ArgumentParser(description=main_desc) | |
parser.add_argument("path", default=os.path.expanduser('~'), nargs='?', | |
help="The path containing different results from bismark") | |
parser.add_argument("-o", "--output", default="report.summary", help="report summarizing results across data") | |
parser.add_argument("-f", "--features", help="List of features to be extracted") | |
parser.add_argument("-v", "--version", action="version", | |
version="%(prog)s {0}".format(__version__), | |
help="show program's version number and exit") | |
return parser | |
class Report(object): | |
def __init__(self, **kwargs): | |
self.path = None | |
self.list_of_features = None | |
self.output = None | |
for (key, value) in kwargs.iteritems(): | |
if hasattr(self, key): | |
setattr(self, key, value) | |
#self._log = init_log() | |
def load_features(self): | |
""" | |
:return: A Dataframe with a single column called features | |
""" | |
features = [line.rstrip('\n') for line in open(self.list_of_features) if not line[0].isspace()] | |
df = pd.DataFrame() | |
df['features'] = features | |
return df | |
def extract_feature(self, draft_report, sample_bismark_output): | |
""" | |
:param draft_report: is basically the dataframe being built | |
each time we read a sample result | |
we add a column with the features specified | |
:param sample_bismark_output: the bismark output for a specific sample | |
:return:dataframe populated with samples | |
""" | |
sample_name = os.path.basename(sample_bismark_output).split(".txt")[0] | |
list = draft_report.features | |
result = [] | |
for item in list: | |
with open(sample_bismark_output) as input_file: | |
for line in input_file: | |
#print line.split(":")[0] | |
if line.split(":")[0] == item: | |
#print item | |
result.append(line.split(":")[1].rstrip("\n")) | |
draft_report[sample_name] = result | |
return draft_report | |
def list_files(self): | |
ls_files = [] | |
ls_labels = [] | |
for sName in os.listdir(self.path): | |
if os.path.isfile(os.path.join(self.path, sName)) and sName.endswith(".txt"): | |
ls_files.append(os.path.join(self.path, sName)) | |
fileName, fileExtension = os.path.splitext(sName) | |
sName = os.path.basename(fileName).split('.')[0] | |
ls_labels.append(sName) | |
return ls_files | |
def generate(self): | |
df_temp = self.load_features() | |
#print df_temp.features | |
list_of_bismarks = self.list_files() | |
#print list_of_bismarks | |
for sample in list_of_bismarks: | |
report = self.extract_feature(df_temp, sample) | |
report.to_csv(self.output, index=False, sep='\t', encoding='utf-8') | |
def main(): | |
args = arguments().parse_args() | |
report = Report() | |
# Set parameters | |
log = init_log() | |
log.info("Starting the report generator...") | |
try: | |
log.info("Loading features from " + args.features) | |
report.list_of_features = args.features | |
log.info("Loading bismark results from " + args.path) | |
report.path = args.path | |
report.output = args.output | |
except Exception, e: | |
log.exception(e) | |
log.info("Started Generating report .. ") | |
report.generate() | |
log.info("Finished Generating report .. ") | |
log.info("Report saved to " + args.output) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment