Skip to content

Instantly share code, notes, and snippets.

@liquidcarbon
Created July 1, 2020 15:31
Show Gist options
  • Save liquidcarbon/63397706eef297f78a7df05b8f3ea700 to your computer and use it in GitHub Desktop.
Save liquidcarbon/63397706eef297f78a7df05b8f3ea700 to your computer and use it in GitHub Desktop.
Map-Reduce implementation of COUNT GROUP BY on each column of a large dataframe
from functools import reduce
import numpy as np
import pandas as pd
class Counts:
"""COUNT ... GROUP BY on every column of a large dataset"""
def __init__(self, file, ddl_file, n_cols=None, n_top=10):
self.file = file
self.columns = get_columns_from_ddl(ddl_file)
self.conv = {
'message__timestamp': ts_to_dt,
}
if n_cols:
self.n = min(len(self.columns), n_cols)
else:
self.n = len(self.columns)
# self.n_lines = sum(1 for l in gzip.open(file,'rb'))
self.n_top = n_top
self.result = {}
def count_chunks(self):
self.chunks = pd.read_csv(
self.file,
sep='\x10',
# converters=self.conv,
chunksize=10000,
header=None,
low_memory=False,
na_values='\\N',
nrows=5e6,
usecols=[i for i in range(self.n)],
)
# MAP
counts = []
for chunk in self.chunks:
counts.append([ntop(chunk[c], None) for c in chunk.columns])
print('mapping chunk number {:>4}'.format(len(counts)), end='\r')
counts = np.array(counts)
# # or:
# counts = np.array([
# [ntop(chunk[c], None) for c in chunk.columns] \
# for chunk in self.chunks
# ])
# REDUCE
for i in range(self.n):
print('reducing: {}'.format(self.columns[i]) + ' '*40, end='\r')
self.result[self.columns[i]] = reduce(
series_add, counts[:,i]
).astype(int)
# SORT by column names
self.result = sorted(self.result.items())
def summarize(self):
"""Summarize results in a neat dataframe."""
#initialize result array
result_columns = ['column','n_unique']
for i in range(self.n_top):
result_columns.append('top_%s\nvalue' % (i+1))
result_columns.append('top_%s\ncount' % (i+1))
result_columns.append('top_%s\nrel_count' % (i+1))
result = [result_columns]
#loop over result columns
for col in self.result:
# print('analyzing column: {}'.format(col[0]), end='\r')
col_name = col[0].split('.')[-1]
n_unique = len(col[1])
col_summary = [col_name, n_unique]
vc_abs = col[1].sort_values(ascending=False)
vc_norm = (vc_abs / vc_abs.sum()).round(5) * 100
for i in range(min(self.n_top, n_unique)):
col_summary.append(vc_abs.index[i])
col_summary.append(vc_abs.values[i])
col_summary.append(vc_norm.values[i])
result.append(col_summary)
df = pd.DataFrame(columns=result[0], data=result[1:])
df.insert(1, '100% NULL', 'FALSE')
df.loc[
(df['top_1\nvalue'] == 'NULL') & (df['n_unique'] == 1),
'100% NULL'
] = 'TRUE'
self.summary = df
def ntop(s: pd.Series, n: int, fillna='NULL'):
"""Returns top n values from a Pandas series."""
vc = s.fillna(fillna).value_counts(dropna=False).head(n)
return vc
def series_add(previous_result: pd.Series, new_result: pd.Series):
"""Reducing function for adding up the results across chunks.
Equivalent to ``lambda a,b: a+b`` except takes advantage of
``fill_value`` in pd.Series.add"""
return previous_result.add(new_result, fill_value=0)
def get_columns_from_ddl(file):
"""Reads column headers from a DDL file derived from
SHOW CREATE TABLE.
"""
with open(file, 'r') as f:
ddl = f.read()
return ddl.split('`')[3::2]
def tryint(s: str, exc: int=-1) -> int:
"""Converts string to integer, filling with ``exc`` if NULL."""
try:
return int(s)
except:
return exc
def ts_to_dt(s: str):
"""Converts timestamp(Unix milliseconds) to pandas datetime."""
try:
t=int(s)
return pd.to_datetime(int(s)*1e6)
except:
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment