Created
November 7, 2017 17:38
-
-
Save jgrant41475/98f3e5cf81b256233f6a6c00c80e406a to your computer and use it in GitHub Desktop.
A generic solution for filtering, grouping, and sorting a csv file by two fields
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
from operator import itemgetter as get | |
from sys import argv, maxsize as max_int | |
from itertools import groupby as by | |
class CSVEditor: | |
"""CSV Editor Class | |
A generic solution for filtering, grouping, and sorting a csv file by two fields | |
Creates new csv file in the same directory as file_path using the naming template: '[filename] - edited.csv' | |
Arguments | |
--------- | |
@file_path: str | |
Complete file path of the csv to parse. | |
Only accepts files with the extension '.csv' | |
Attributes | |
---------- | |
@columns : list of str | |
Columns to transfer over to updated file | |
@group_by : str | |
Column to group data by | |
@group_by_reversed : bool | |
Sort order | |
@sort_by : str | |
Column to sort groups by | |
@sort_by_reversed : bool | |
Sort order | |
@max : int or None | |
If the data type of the column @sort_by is a number | |
this should be max_int, otherwise it must be None | |
@path : str | |
File path of the original csv | |
@delimiter : byte | |
Character delimiter of csv file | |
@updated_csv : list of dicts of {key(str) : value(str or int)} | |
Container for updated list | |
""" | |
def __init__(self, file_path): | |
""" | |
Initialize instance variables | |
""" | |
self.columns = ["Keyword", "Search Engine", "Rank", "Rank"] | |
self.group_by = "Search Engine" | |
self.group_by_reversed = True | |
self.sort_by = "Rank" | |
self.sort_by_reversed = False | |
self.max = max_int | |
self.path = file_path | |
self.delimiter = ',' | |
self.updated_csv = [] | |
def parse(self): | |
""" | |
Reads @path into memory and parses the data set into a list of dictionaries | |
Sorts and groups the rows by @group_by, extracts @columns from the group and | |
performs another sort on @sort_by After all rows are grouped and sorted push | |
everything onto the instance variable @updated_csv | |
:return: | |
CSVEditor : | |
returns the instance of itself | |
FileNotFoundError : | |
Unable to locate file, exits with an error code | |
""" | |
try: | |
with open(self.path, "r") as file: | |
# List of column headers | |
cols = [c.strip('"') for c in file.readline().strip("\n").split(self.delimiter)] | |
# Read file, Parse lines into a list of dictionaries | |
temp_list = [reduce(lambda x, y: dict(x, **y), | |
[{cols[x]: line.strip("\n").split(self.delimiter)[x].strip('"')} for x in | |
range(len(cols))]) | |
for line in file] | |
# If sort_by type is int, convert column to int type | |
if self.max is max_int: | |
for x in temp_list: | |
x[self.sort_by] = self.max if x[self.sort_by] == "" else int(x[self.sort_by]) | |
# Group by primary key, select subset of columns, sort by secondary key and push onto the updated list | |
[self.updated_csv.extend( | |
sorted([{self.columns[x]: i[self.columns[x]] for x in range(len(self.columns))} for i in g], | |
key=get(self.sort_by), reverse=self.sort_by_reversed)) for _, g in | |
by(sorted(temp_list, key=get(self.group_by), reverse=self.group_by_reversed), | |
key=get(self.group_by))] | |
except FileNotFoundError: | |
exit("Error: File not found!") | |
return self | |
def make_new(self) -> bool: | |
""" | |
Write sorted data to disk | |
:return: | |
True : | |
Created and wrote to file with no errors | |
PermissionError : | |
Unable to get write lock, throws a fatal error and exits program | |
""" | |
try: | |
with open(self.path[0: self.path.find(".csv")] + " - edited.csv", "w") as file: | |
if file.writable() is True: | |
# If writable, write column headers and data | |
file.write(self.delimiter.join(['"{0}"'.format(x) for x in self.columns]) + "\n") | |
[file.write(reduce(lambda x, y: x + self.delimiter + str('""' if str(y) == '"' + str( | |
self.max) + '"' else y) if self.max == max_int else x + self.delimiter + y, | |
['"{0}"'.format(row[self.columns[x]]) for x in range(len(self.columns))]) + "\n") | |
for row in self.updated_csv] | |
except PermissionError: | |
exit("Unable create new file.") | |
return True | |
if __name__ == "__main__": | |
try: | |
# Either returns True for success or the program exits | |
if argv[1][-4:] == ".csv" and CSVEditor(argv[1]).parse().make_new(): | |
print("Done.") | |
except IndexError: | |
exit("Missing argument.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment