Created
July 27, 2022 23:20
-
-
Save sherwoac/0a2e4f5e9352364122586271725001d6 to your computer and use it in GitHub Desktop.
HSBC pdf statements to csv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: pdf_extractor_env | |
channels: | |
- conda-forge | |
- defaults | |
dependencies: | |
- _libgcc_mutex=0.1=main | |
- _openmp_mutex=5.1=1_gnu | |
- blas=1.0=openblas | |
- bottleneck=1.3.5=py39h7deecbd_0 | |
- brotli=1.0.9=he6710b0_2 | |
- brotlipy=0.7.0=py39h27cfd23_1003 | |
- bzip2=1.0.8=h7b6447c_0 | |
- ca-certificates=2022.07.19=h06a4308_0 | |
- cairo=1.16.0=h19f5f5c_2 | |
- camelot-py=0.9.0=pyhd8ed1ab_0 | |
- certifi=2022.6.15=py39h06a4308_0 | |
- cffi=1.15.0=py39hd667e15_1 | |
- charset-normalizer=2.0.4=pyhd3eb1b0_0 | |
- click=8.0.4=py39h06a4308_0 | |
- cryptography=37.0.1=py39h9ce1e76_0 | |
- cycler=0.11.0=pyhd3eb1b0_0 | |
- dbus=1.13.18=hb2f20db_0 | |
- distro=1.5.0=pyhd3eb1b0_1 | |
- eigen=3.3.7=hd09550d_1 | |
- et_xmlfile=1.1.0=py39h06a4308_0 | |
- expat=2.4.4=h295c915_0 | |
- ffmpeg=4.2.2=h20bf706_0 | |
- fontconfig=2.13.1=h6c09931_0 | |
- fonttools=4.25.0=pyhd3eb1b0_0 | |
- freetype=2.11.0=h70c0345_0 | |
- ghostscript=9.54.0=h27087fc_2 | |
- giflib=5.2.1=h7b6447c_0 | |
- glib=2.69.1=h4ff587b_1 | |
- gmp=6.2.1=h295c915_3 | |
- gnutls=3.6.15=he1e5248_0 | |
- graphite2=1.3.14=h295c915_1 | |
- gst-plugins-base=1.14.0=h8213a91_2 | |
- gstreamer=1.14.0=h28cd5cc_2 | |
- harfbuzz=4.3.0=hd55b92a_0 | |
- hdf5=1.10.6=h3ffc7dd_1 | |
- icu=58.2=he6710b0_3 | |
- idna=3.3=pyhd3eb1b0_0 | |
- intel-openmp=2021.4.0=h06a4308_3561 | |
- jpeg=9e=h7f8727e_0 | |
- kiwisolver=1.4.2=py39h295c915_0 | |
- lame=3.100=h7b6447c_0 | |
- lcms2=2.12=h3be6417_0 | |
- ld_impl_linux-64=2.38=h1181459_1 | |
- libffi=3.3=he6710b0_2 | |
- libgcc-ng=11.2.0=h1234567_1 | |
- libgfortran-ng=11.2.0=h00389a5_1 | |
- libgfortran5=11.2.0=h1234567_1 | |
- libgomp=11.2.0=h1234567_1 | |
- libidn2=2.3.2=h7f8727e_0 | |
- libopenblas=0.3.20=h043d6bf_1 | |
- libopus=1.3.1=h7b6447c_0 | |
- libpng=1.6.37=hbc83047_0 | |
- libprotobuf=3.20.1=h4ff587b_0 | |
- libstdcxx-ng=11.2.0=h1234567_1 | |
- libtasn1=4.16.0=h27cfd23_0 | |
- libtiff=4.2.0=h2818925_1 | |
- libunistring=0.9.10=h27cfd23_0 | |
- libuuid=1.0.3=h7f8727e_2 | |
- libvpx=1.7.0=h439df22_0 | |
- libwebp=1.2.2=h55f646e_0 | |
- libwebp-base=1.2.2=h7f8727e_0 | |
- libxcb=1.15=h7f8727e_0 | |
- libxml2=2.9.14=h74e7548_0 | |
- lz4-c=1.9.3=h295c915_1 | |
- matplotlib-base=3.4.3=py39hbbc1b5f_0 | |
- mkl=2021.4.0=h06a4308_640 | |
- mkl-service=2.4.0=py39h7f8727e_0 | |
- munkres=1.1.4=py_0 | |
- ncurses=6.3=h5eee18b_3 | |
- nettle=3.7.3=hbbd107a_1 | |
- numpy=1.16.6=py39h0708ffd_4 | |
- numpy-base=1.16.6=py39ha8aedfd_4 | |
- opencv=4.5.5=py39h1ca2c5e_3 | |
- openh264=2.1.1=h4ff587b_0 | |
- openjdk=11.0.13=h87a67e3_0 | |
- openjpeg=2.4.0=h3ad879b_0 | |
- openpyxl=3.0.10=py39h5eee18b_0 | |
- openssl=1.1.1q=h7f8727e_0 | |
- packaging=21.3=pyhd3eb1b0_0 | |
- pandas=1.2.4=py39ha9443f7_0 | |
- pcre=8.45=h295c915_0 | |
- pdfminer.six=20220524=pyhd8ed1ab_1 | |
- pillow=9.2.0=py39hace64e9_1 | |
- pip=22.1.2=py39h06a4308_0 | |
- pixman=0.40.0=h7f8727e_1 | |
- pycparser=2.21=pyhd3eb1b0_0 | |
- pycryptodome=3.15.0=py39h276157c_0 | |
- pyopenssl=22.0.0=pyhd3eb1b0_0 | |
- pyparsing=3.0.4=pyhd3eb1b0_0 | |
- pypdf2=2.4.2=pyhd8ed1ab_0 | |
- pysocks=1.7.1=py39h06a4308_0 | |
- python=3.9.12=h12debd9_1 | |
- python-dateutil=2.8.2=pyhd3eb1b0_0 | |
- python_abi=3.9=2_cp39 | |
- pytz=2022.1=py39h06a4308_0 | |
- qt=5.9.7=h5867ecd_1 | |
- readline=8.1.2=h7f8727e_1 | |
- requests=2.28.1=py39h06a4308_0 | |
- setuptools=61.2.0=py39h06a4308_0 | |
- setuptools-scm=7.0.4=py39h06a4308_0 | |
- setuptools_scm=7.0.4=hd3eb1b0_0 | |
- six=1.16.0=pyhd3eb1b0_1 | |
- sqlite=3.38.5=hc218d9a_0 | |
- tabula-py=2.2.0=py39hf3d152e_3 | |
- tk=8.6.12=h1ccaba5_0 | |
- tomli=2.0.1=py39h06a4308_0 | |
- tornado=6.1=py39h27cfd23_0 | |
- tqdm=4.64.0=py39h06a4308_0 | |
- typing-extensions=4.1.1=hd3eb1b0_0 | |
- typing_extensions=4.1.1=pyh06a4308_0 | |
- tzdata=2022a=hda174b7_0 | |
- urllib3=1.26.9=py39h06a4308_0 | |
- wheel=0.37.1=pyhd3eb1b0_0 | |
- x264=1!157.20191217=h7b6447c_0 | |
- xz=5.2.5=h7f8727e_1 | |
- zlib=1.2.12=h7f8727e_2 | |
- zstd=1.5.2=ha4553b6_0 | |
- pip: | |
- opencv-python==4.6.0.66 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import sys | |
from collections import defaultdict | |
import csv | |
import camelot | |
import pandas as pd | |
import io | |
import numpy as np | |
import tqdm | |
import dateutil | |
_brought_forward = 'BALANCE BROUGHT FORWARD' | |
_carried_forward = 'BALANCE CARRIED FORWARD' | |
def concat_description(lines): | |
""" | |
concatenates the description for lines without amounts | |
:param no_extras_lines: | |
:return: only lines with amounts | |
""" | |
# find grouped lines | |
continuation_to_final_lines = defaultdict(list) | |
final_line = 0 | |
# easier to do it in reverse as there's always a final line | |
for continuation_line in reversed(range(len(lines))): | |
line = lines[continuation_line] | |
if len(line): | |
if all([el == '' for el in line[-3:]]): | |
continuation_to_final_lines[final_line].append(continuation_line) | |
else: | |
final_line = continuation_line | |
# concat grouped lines | |
for final_line in reversed(sorted(continuation_to_final_lines.keys())): | |
continuation_lines = continuation_to_final_lines[final_line] | |
descriptions = (lines[continuation_line][2] for continuation_line in sorted(continuation_lines) + [final_line]) | |
desc = ' '.join(descriptions) | |
replacement_line = lines[continuation_lines[0]][:2] + [desc] + lines[final_line][3:] | |
# delete the unwanted lines bottom up | |
del lines[final_line] | |
for continuation_line in reversed(sorted(continuation_lines)[1:]): | |
del lines[continuation_line] | |
lines[sorted(continuation_lines)[0]] = replacement_line | |
return lines | |
def write_out_lines(output_filename: str, lines: list): | |
with open(output_filename, 'w', encoding='UTF8', newline='\n') as f: | |
writer = csv.writer(f) | |
writer.writerows(lines) | |
f.close() | |
def contains_elements_by_row(df: pd.DataFrame, elements: list): | |
""" | |
whether each row contains any of the elements | |
:param df: given dataframe | |
:param elements: contains any of these terms | |
:return: rows by boolean | |
""" | |
any_row = [False] * len(df) | |
for index, row in df.iterrows(): | |
any_row[index] = any([row.str.contains(check_el).any() for check_el in elements]) | |
return any_row | |
column_titles = ['Date\nPay m e nt\nt y pe and de t ails', 'Pay m e nt\nt y pe and de t ails', 'Paid out', 'Paid in', 'Balance'] | |
def pdf_to_text(input_filename): | |
tables = camelot.read_pdf(input_filename, flavor='stream', flag_size=True, pages='all') | |
table_page_to_table_index = {} | |
for table_index, table in enumerate(tables): | |
# print(f'{table.page=} {table_index=}, ' | |
# f'len(df)={len(table.df)} ' | |
# f'shape: {table.df.shape} ' | |
# f'column sum: {table.df.isin(column_titles).any(axis=0).sum()} ' | |
# f'row.str.contains: {sum(contains_elements_by_row(table.df, [_brought_forward, _carried_forward]))}') | |
# print(table.df) | |
if table.df.isin(column_titles).any(axis=0).sum() > 1 \ | |
and sum(contains_elements_by_row(table.df, [_brought_forward, _carried_forward])) == 2: | |
if table.page in table_page_to_table_index: | |
print(f'check: {input_filename=} {table.page=}') | |
if (table.page in table_page_to_table_index and \ | |
len(tables[table_page_to_table_index[table.page]].df) < len(table.df)) or table.page not in table_page_to_table_index: | |
# check it's longer, if it is, stick it in | |
table_page_to_table_index[table.page] = table_index | |
df = pd.DataFrame() | |
for table_index in table_page_to_table_index.values(): | |
this_df = tables[table_index].df | |
# print(this_df) | |
this_df.reset_index(drop=True, inplace=True) | |
# must contain one of column_titles | |
for column, is_column in this_df.isin(column_titles).any(axis=0).items(): | |
if not is_column: | |
this_df.drop(this_df.columns[[column]], axis=1, inplace=True) | |
this_df.columns = range(this_df.columns.size) | |
matches = contains_elements_by_row(this_df, [_brought_forward, _carried_forward]) | |
keep_from = matches.index(True) | |
keep_to = matches.index(True, keep_from + 1) | |
assert keep_from and keep_to, f'{keep_from=} {keep_to=}' | |
this_df = this_df[keep_from+1:keep_to] | |
df = pd.concat([df, this_df]) | |
# print(this_df) | |
# print(f'{table.page=} {table_index=}, len(df)={len(df)}') | |
df.reset_index(drop=True, inplace=True) | |
# name existing columns | |
df.columns = ['desc', 'out', 'in', 'balance'] | |
df['date'] = None | |
df['type'] = None | |
last_type = '' | |
for row_index, row in df.iterrows(): | |
# print(row_index, row) | |
split_desc = row['desc'].split('\n') | |
split_desc.sort(key=len) | |
if len(split_desc) > 2: | |
for split_index, el in enumerate(split_desc): | |
try: | |
last_date = dateutil.parser.parse(el).strftime('%Y-%m-%d') | |
del split_desc[split_index] | |
except dateutil.parser._parser.ParserError as p: | |
continue | |
if len(split_desc) > 1: | |
last_type = split_desc[0] | |
df.loc[row_index, 'type'] = last_type | |
df.loc[row_index, 'date'] = last_date | |
df.loc[row_index, 'desc'] = split_desc[-1] | |
# reorder columns | |
df = df[['date', 'type', 'desc', 'out', 'in', 'balance']] | |
# write to buffer | |
buffer = io.StringIO() | |
df.to_csv(buffer, index=False, line_terminator='\n') | |
buffer.seek(0) | |
# splitting by \n is ugly at this point, the csv parser won't delimit by lines, relies on file parser | |
output = buffer.getvalue().split('\n') | |
buffer.close() | |
reader = csv.reader(output, delimiter=',', quotechar='"') | |
lines = [row for row in reader] | |
return lines[0], lines[1:-1] | |
if __name__ == '__main__': | |
assert len(sys.argv) == 3 | |
input_dir = sys.argv[1] | |
output_file = sys.argv[2] | |
print(f'processing: dir: {input_dir} to csv:{output_file}') | |
assert os.path.isdir(input_dir), f'{input_dir=} does not exist' | |
all_lines = None | |
for pdf_filename in (progress_bar := tqdm.tqdm(sorted(os.listdir(input_dir)))): | |
if 'pdf' in pdf_filename: | |
progress_bar.set_description(f'{pdf_filename=}') | |
columns, file_lines = pdf_to_text(os.path.join(input_dir, pdf_filename)) | |
file_lines = concat_description(file_lines) | |
if not all_lines: | |
all_lines = [columns] | |
all_lines.extend(file_lines) | |
write_out_lines(output_file, all_lines) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment