Created
August 26, 2021 21:10
-
-
Save jeetsukumaran/c7c236d244b188c6e56031d6a36cff89 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
############################################################################### | |
## | |
## Copyright 2014 Jeet Sukumaran. | |
## | |
## This program is free software; you can redistribute it and/or modify | |
## it under the terms of the GNU General Public License as published by | |
## the Free Software Foundation; either version 3 of the License, or | |
## (at your option) any later version. | |
## | |
## This program is distributed in the hope that it will be useful, | |
## but WITHOUT ANY WARRANTY; without even the implied warranty of | |
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
## GNU General Public License for more details. | |
## | |
## You should have received a copy of the GNU General Public License along | |
## with this program. If not, see <http://www.gnu.org/licenses/>. | |
## | |
############################################################################### | |
""" | |
Manipulation of data in table format. | |
""" | |
import argparse | |
import sys | |
import os | |
import re | |
import math | |
import collections | |
import csv | |
def open_source_file(filepath): | |
if sys.version_info >= (3,0,0): | |
f = open(filepath, 'r') | |
else: | |
f = open(filepath, 'rU') | |
return f | |
def open_output(filepath): | |
if filepath is None: | |
return sys.stdout | |
f = open(filepath, "w") | |
return f | |
def open_output_file_for_csv_writer(filepath, append=False): | |
if filepath is None: | |
out = sys.stdout | |
elif sys.version_info >= (3,0,0): | |
out = open(filepath, | |
"a" if append else "w", | |
newline='') | |
else: | |
out = open(filepath, "ab" if append else "wb") | |
return out | |
def get_fieldname_value_map(labels): | |
if not labels: | |
return collections.OrderedDict() | |
fieldname_value_map = collections.OrderedDict() | |
for label in labels: | |
match = re.match(r"\s*(.*)\s*:\s*(.*)\s*", label) | |
if not match: | |
raise ValueError("Cannot parse fieldname and label (format required: fieldname:value): {}".format(label)) | |
fieldname, value = match.groups(0) | |
fieldname_value_map[fieldname] = value | |
return fieldname_value_map | |
def process_delimiter_entry(delimiter): | |
if delimiter == r'\t': | |
delimiter = "\t" | |
elif delimiter == r'\|': | |
delimiter = "|" | |
return delimiter | |
def flatten_table(args): | |
out = open_output(args.output_filepath) | |
for filepath in args.filepaths: | |
if not args.quiet: | |
sys.stderr.write("-- [{}]\n".format(filepath)) | |
src = open_source_file(filepath) | |
with src: | |
reader = csv.DictReader(src, delimiter=args.field_separator) | |
fieldnames = reader.fieldnames | |
max_field_name_width = max([len(f) for f in fieldnames]) | |
for row_idx, row_data in enumerate(reader): | |
if not args.quiet: | |
sys.stderr.write("-- [{}:{}]\n".format(filepath, row_idx+1)) | |
if args.num_rows and row_idx > args.num_rows: | |
break | |
for fieldname in fieldnames: | |
fieldvalue = row_data[fieldname] | |
if args.no_align: | |
out.write("{}: {}\n".format(fieldname, fieldvalue)) | |
else: | |
out.write("{fieldname:{fieldnamewidth}}: {fieldvalue}\n".format( | |
fieldname=fieldname, | |
fieldnamewidth=max_field_name_width, | |
fieldvalue=fieldvalue)) | |
def merge_table(args): | |
additional_fields = get_fieldname_value_map(args.add_field) | |
table_data = [] | |
fieldnames = [] | |
fieldnames.extend(additional_fields) | |
for filepath in args.filepaths: | |
if not args.quiet: | |
sys.stderr.write("-- [{}]".format(filepath)) | |
src = open_source_file(filepath) | |
with src: | |
reader = csv.DictReader(src, delimiter=args.field_separator) | |
subfieldnames = reader.fieldnames | |
if subfieldnames is None: | |
if args.ignore_invalid_files: | |
if not args.quiet: | |
sys.stderr.write(": EMPTY (ignoring)\n") | |
continue | |
else: | |
sys.exit("\nFile is invalid") | |
if not args.quiet: | |
sys.stderr.write("\n") | |
for f in subfieldnames: | |
if f not in fieldnames: | |
fieldnames.append(f) | |
for row_idx, row_data in enumerate(reader): | |
if args.num_rows and row_idx > args.num_rows: | |
break | |
row_data.update(additional_fields) | |
table_data.append(row_data) | |
out = open_output_file_for_csv_writer(args.output_filepath, append=args.append) | |
with out: | |
if args.output_delimiter is None: | |
delimiter = args.field_separator | |
else: | |
delimiter = args.output_delimiter | |
writer = csv.DictWriter( | |
out, | |
fieldnames=fieldnames, | |
restval=args.empty_field, | |
delimiter=delimiter, | |
lineterminator=os.linesep, | |
) | |
if not args.no_header_row: | |
writer.writeheader() | |
writer.writerows(table_data) | |
def inspect_table(args): | |
out = sys.stdout | |
if args.zero_based_column_indexing: | |
index_offset = 0 | |
else: | |
index_offset = 1 | |
for filepath in args.filepaths: | |
if not args.quiet: | |
sys.stderr.write("-- [{}]\n".format(filepath)) | |
src = open_source_file(filepath) | |
with src: | |
reader = csv.DictReader( src, delimiter=args.field_separator) | |
fieldnames = reader.fieldnames | |
max_field_name_width = max([len(f) for f in fieldnames]) | |
if not args.no_list_columns: | |
nfields = len(fieldnames) | |
ndigits = int(math.floor(math.log(nfields, 10)) + 1) | |
max_index_field_width = ndigits + 2 | |
for field_idx, fieldname in enumerate(fieldnames): | |
index_field = ("[{}]".format(field_idx + index_offset)).rjust(max_index_field_width) | |
out.write("{} {}\n".format(index_field, fieldname)) | |
if args.as_python_list: | |
out.write("fields = [\n") | |
for field_idx, fieldname in enumerate(fieldnames): | |
fieldname_entry = "'{}',".format(fieldname) | |
out.write(" {fieldname:{width}} # {field_idx}\n".format( | |
fieldname=fieldname_entry, | |
field_idx=field_idx + index_offset, | |
width=max_field_name_width + 3)) | |
out.write("]\n") | |
def main(): | |
""" | |
Main CLI handler. | |
""" | |
master_parser = argparse.ArgumentParser() | |
subparsers = master_parser.add_subparsers(title="Operation Commands", help="type '%(prog)s <command-name> --help' for more information.") | |
source_options_parser = argparse.ArgumentParser(add_help=False) | |
source_options_parser.add_argument("filepaths", metavar="FILE", type=str, nargs="*", help="Path to input data file(s).") | |
source_options = source_options_parser.add_argument_group("Source Options") | |
source_options.add_argument("--from-file", | |
default=None, | |
metavar="FILEPATH", | |
help="Read list of files from FILEPATH.") | |
source_options.add_argument("-F", "--field-separator", | |
default="\t", | |
metavar="CHAR", | |
help="Delimiter separating fields in input (default: '<TAB>').") | |
source_options.add_argument("-H", "--no-header-row", | |
action="store_true", | |
default=False, | |
help="Input table(s) have no header rows.") | |
table_output_options_parser = argparse.ArgumentParser(add_help=False) | |
table_output_options = table_output_options_parser.add_argument_group("Output Options") | |
table_output_options.add_argument("-o", "--output-filepath", | |
default=None, | |
help="Path to output file (default: standard output)") | |
table_output_options.add_argument("-D", "--output-delimiter", | |
default=None, | |
metavar="CHAR", | |
help="Delimiter to use to separate fields in output (default: same as input delimiter).") | |
table_output_options.add_argument("--suppress-header-row", | |
action="store_true", | |
default=False, | |
help="Do not write header row") | |
table_output_options.add_argument( "--append", | |
action="store_true", | |
default=False, | |
help="Append to output file if it already exists instead of overwriting.") | |
inspect_parser = subparsers.add_parser('inspect', | |
parents=[source_options_parser], | |
help="Report information about the tables.", | |
) | |
inspect_processing_options = inspect_parser.add_argument_group("Processing Options") | |
inspect_processing_options.add_argument("-r", "--count-rows", | |
action="store_true", | |
default=False, | |
help="Count the number of rows (not including header row).") | |
inspect_processing_options.add_argument("-C", "--no-list-columns", | |
action="store_true", | |
default=False, | |
help="Do not list the columns by name.") | |
inspect_processing_options.add_argument("--0-based", | |
dest="zero_based_column_indexing", | |
action="store_true", | |
default=False, | |
help="Use 0-based column indexing.") | |
inspect_processing_options.add_argument("--as-python-list", | |
action="store_true", | |
default=False, | |
help="Show results as a Python-syntax list.") | |
inspect_parser.set_defaults(func=inspect_table) | |
merge_parser = subparsers.add_parser('merge', | |
parents=[source_options_parser, table_output_options_parser], | |
help="Merge rows of all tables.", | |
) | |
merge_processing_options = merge_parser.add_argument_group("Processing Options") | |
merge_processing_options.add_argument("-n", "--num-rows", | |
default=None, | |
metavar="NUM-ROWS", | |
type=int, | |
help="Maximum number of data rows to process in each file.") | |
# merge_processing_options.add_argument("--file-index-fieldname", | |
# default=False, | |
# metavar="FIELDNAME", | |
# help="Add a new field, '<FIELDNAME>', that will be given a value equal to the (1-based) index of the source file of the row.") | |
# merge_processing_options.add_argument("--file-path-fieldname", | |
# default=False, | |
# metavar="FIELDNAME", | |
# help="Add a new field, '<FIELDNAME>', that will be given a value equal to the full path index of the source file.") | |
merge_processing_options.add_argument("-a", "--add-field", | |
action="append", | |
metavar="NAME:VALUE", | |
help="Fields to insert into output (in format 'NAME:VALUE')") | |
merge_processing_options.add_argument("-e", "--empty-field", | |
metavar="VALUE", | |
default=None, | |
help="Value to use for missing fields (if not specified, a missing field will result in an error).") | |
merge_processing_options.add_argument("-i", "--ignore-invalid-files", | |
action="store_true", | |
default=None, | |
help="Ignore broken or empty files.") | |
merge_parser.set_defaults(func=merge_table) | |
flatten_parser = subparsers.add_parser('flatten', | |
parents=[source_options_parser, table_output_options_parser], | |
help="Print each field name-value pair as a separate row.", | |
) | |
flatten_processing_options = flatten_parser.add_argument_group("Processing Options") | |
flatten_processing_options.add_argument("-n", "--num-rows", | |
default=None, | |
metavar="NUM-ROWS", | |
type=int, | |
help="Maximum number of data rows to process in each file.") | |
flatten_processing_options.add_argument("-a", "--add-field", | |
action="append", | |
metavar="NAME:VALUE", | |
help="Fields to append to output (in format 'NAME:VALUE')") | |
flatten_processing_options.add_argument("--no-align", | |
action="store_true", | |
default=False, | |
help="Do not align output.") | |
flatten_parser.set_defaults(func=flatten_table) | |
master_parser.add_argument("-q", "--quiet", | |
action="store_true", | |
default=False, | |
help="Suppress meta-information and progress messages.") | |
args = master_parser.parse_args() | |
if len(args.filepaths) == 0 and args.from_file is None: | |
sys.exit("Need to specify path(s) to input filepaths.") | |
elif args.from_file is not None: | |
with open(args.from_file, "r") as f: | |
for path in f.read().split("\n"): | |
if not path: | |
continue | |
path = os.path.expandvars(os.path.expanduser(path)) | |
args.filepaths.append(path) | |
args.field_separator = process_delimiter_entry(args.field_separator) | |
try: | |
args.output_delimiter = process_delimiter_entry(args.output_delimiter) | |
except AttributeError: | |
pass | |
args.func(args) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment