Last active
September 8, 2023 02:15
-
-
Save andsilver/410a32d88edcbfb2d0aaf30a13b09909 to your computer and use it in GitHub Desktop.
Parse a PDF doc using doctr, find legal description with bounding boxes information using Pandas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from doctr.io import DocumentFile | |
from doctr.models import ocr_predictor | |
import pandas as pd | |
import numpy as np | |
from fuzzywuzzy import fuzz | |
from pdb import set_trace | |
import json | |
import sys | |
model = ocr_predictor(det_arch='db_resnet50', | |
reco_arch='crnn_vgg16_bn', pretrained=True) | |
class ParseReport: | |
def __init__(self, path: str): | |
if path.endswith('.pdf'): | |
data = DocumentFile.from_pdf(file=path) | |
else: | |
data = DocumentFile.from_images(files=[path]) | |
pd.set_option("display.precision", 6) | |
# df = pd.read_json('./example.json') | |
result = model(data) | |
json_output = result.export() | |
with open('./example.json', "w") as f: | |
json.dump(json_output, f, indent=2) | |
df = pd.DataFrame(json_output) | |
pages = df.join(pd.json_normalize(df.pop('pages'))) | |
blocks = pages.explode("blocks") | |
blocks['block_idx'] = np.arange(blocks.shape[0]) | |
blocks['index'] = blocks['block_idx'] | |
blocks = blocks.set_index('index') | |
blocks = blocks.join(pd.json_normalize(blocks.pop('blocks'))) | |
blocks = blocks.rename(columns={'geometry': 'block_geometry'}) | |
lines = blocks.explode("lines") | |
lines['line_idx'] = np.arange(lines.shape[0]) | |
lines['index'] = np.arange(lines.shape[0]) | |
lines = lines.set_index('index') | |
lines = lines.join(pd.json_normalize(lines.pop('lines'))) | |
lines = lines.rename(columns={'geometry': 'line_geometry'}) | |
save_lines = lines.copy() | |
save_lines["x1"] = save_lines['words'].apply( | |
lambda x: x[0]['geometry'][0][0]) | |
save_lines["x2"] = save_lines['words'].apply( | |
lambda x: x[0]['geometry'][1][0]) | |
save_lines["y1"] = save_lines['words'].apply( | |
lambda x: x[0]['geometry'][0][1]) | |
save_lines["y2"] = save_lines['words'].apply( | |
lambda x: x[0]['geometry'][1][1]) | |
save_lines["y1"] += save_lines["page_idx"] | |
save_lines["y2"] += save_lines["page_idx"] | |
save_lines['words'] = save_lines['words'].apply( | |
lambda x: ' '.join(list(map(lambda item: item['value'], x)))) | |
# save_lines["line_geometry"] = save_lines.line_geometry.apply(lambda x: {"x1": x[0][0], "y1": x[0][1], "x2": x[1][0], "y2": x[1][1]}) | |
save_lines = save_lines.rename(columns={'words': 'value'}) | |
save_lines = save_lines.join( | |
pd.json_normalize(save_lines.pop('line_geometry'))) | |
self.lines = save_lines | |
words = lines.explode("words") | |
words['word_idx'] = np.arange(words.shape[0]) | |
words['index'] = np.arange(words.shape[0]) | |
words = words.set_index('index') | |
words = words.join(pd.json_normalize(words.pop('words'))) | |
words = words.rename(columns={'geometry': 'word_geometry'}) | |
words["word_geometry"] = words.word_geometry.apply( | |
lambda x: {"x1": x[0][0], "y1": x[0][1], "x2": x[1][0], "y2": x[1][1]}) | |
self.words = words.join(pd.json_normalize(words.pop('word_geometry'))) | |
def find_attribute_one(self, word, extract_value=True, context=None): | |
""" | |
Finds attributes values with one word. | |
:param word: A word to match in any line of the document. | |
:param extract_value: if True returns only the value else returns the whole line. | |
:param context: if there are any specific context to look at, to shorten the search space. | |
:return: a string of value for an attribute. | |
""" | |
if context is None: | |
context = self.words | |
try: | |
line = context[context['value'].str.contains( | |
word)][["page_idx", "block_idx", "line_idx", "value"]].values.squeeze() | |
line = context[(context['page_idx'] == line[0]) & ( | |
context['block_idx'] == line[1]) & (context['line_idx'] == line[2])]["value"].values | |
if extract_value: | |
return self.extract_value(" ".join(line)) | |
return " ".join(line) | |
except Exception as e: | |
return None | |
def find_attribute_two(self, word1, word2, extract_value=True, context=None): | |
""" | |
Finds attributes values with two words. | |
:param word1: first word of two to match with any line of the document | |
:param word2: second word of two to match with any line of the document | |
:param extract_value: if True returns only the value else returns the whole line. | |
:param context: if there are any specific context to look at, to shorten the search space. | |
:return: a string of value for an attribute. | |
""" | |
if context is None: | |
context = self.words | |
try: | |
line = context[context['value'].str.lower().str.contains(word1.lower()) | context['value'].str.lower().str.contains(word2.lower())][ | |
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]] | |
line['paired'] = line.value + " " + line.value.shift(-1) | |
line['space'] = abs(line.x1.shift(-1) - line.x2) | |
line['align'] = (abs(line.y1.shift(-1) - line.y1) + | |
abs(line.y2.shift(-1) - line.y2)) / 2 | |
line["score"] = line["paired"].apply( | |
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2))) | |
line = line[line.score < 10] | |
line = line.sort_values(by=['score', 'space', "align"]) | |
line = line.iloc[0][["page_idx", | |
"block_idx", "line_idx", "paired"]] | |
line = context[(context['page_idx'] == line.page_idx.item()) & (context['block_idx'] == line.block_idx.item()) & ( | |
context['line_idx'] == line.line_idx.item())]["value"].values | |
if extract_value: | |
return self.extract_value(" ".join(line)) | |
return " ".join(line) | |
except Exception as e: | |
return None | |
def company_name(self): | |
""" | |
First line of the document, and assign as a company name. | |
:return: string (company name) | |
""" | |
return " ".join(self.words[(self.words['page_idx'] == 0) & (self.words['block_idx'] == 0) & (self.words['line_idx'] == 0)]["value"].values) | |
@staticmethod | |
def extract_value(sentence): | |
""" | |
Separates a string with key value combination, and returns value. | |
:param sentence: a sentence to be split into key and value | |
:return: a string of value that was split | |
""" | |
return sentence.split(":")[-1].strip() | |
def get_blocks(self, keywords): | |
context = self.lines.sort_values(['page_idx', 'y1']) | |
context['paragraph_idx'] = 0 | |
context['prev_m'] = 0 | |
context['next_m'] = 0 | |
paragraph_idx = 1 | |
for i in range(0, context.shape[0]): | |
current_row = context.iloc[i] | |
if i == 0: | |
prev_m = 0 | |
else: | |
prev_row = context.iloc[i - 1] | |
prev_m = current_row['y1'] - \ | |
prev_row['y2'] if current_row['y1'] > prev_row['y2'] else 0 | |
if i == context.shape[0] - 1: | |
next_m = 0 | |
else: | |
next_row = context.iloc[i + 1] | |
next_m = next_row['y1'] - \ | |
current_row['y2'] if next_row['y1'] > current_row['y2'] else 0 | |
diff = round(abs(prev_m - next_m), 2) | |
if diff != 0 and prev_m > next_m: | |
paragraph_idx = paragraph_idx + 1 | |
elif round(abs(current_row['x1'] - context.iloc[i - 1]['x1']), 1) > 0.3: | |
paragraph_idx = paragraph_idx + 1 | |
context.prev_m.iloc[i] = prev_m | |
context.next_m.iloc[i] = prev_m | |
context.paragraph_idx.iloc[i] = paragraph_idx | |
context.to_csv('./test.csv') | |
block = context[context['value'].str.lower().str.contains( | |
'|'.join(keywords))][["page_idx", "block_idx", "paragraph_idx", "value"]] | |
blocks = (list(set(block['block_idx'].to_list()))) | |
paragraphs = (list(set(block['paragraph_idx'].to_list()))) | |
if not len(blocks) or not len(paragraphs): | |
return [] | |
words = context[context["block_idx"].isin(blocks) | context['paragraph_idx'].isin(paragraphs)][[ | |
"page_idx", "block_idx", 'paragraph_idx', 'line_idx', "value"]] | |
df = pd.DataFrame() | |
df['text'] = words.groupby(['paragraph_idx'])['value'].transform( | |
lambda x: ' '.join(x)).drop_duplicates().tolist() | |
df['paragraph'] = words.groupby(['paragraph_idx'])[ | |
'paragraph_idx'].min().tolist() | |
df['block'] = words.groupby(['paragraph_idx'])[ | |
'block_idx'].min().tolist() | |
records = df.to_dict('records') | |
return records | |
def get_lines(self, word1, word2, context=None): | |
""" | |
Finds lines containing two words from the whole document. | |
:param word1: first word of two to match with any line of the document | |
:param word2: second word of two to match with any line of the document | |
:param context: if there are any specific context to look at, to shorten the search space. | |
:return: a pd.Dataframe with high score of being the lines we are looking for. | |
""" | |
if context is None: | |
context = self.words | |
line = context[context['value'].str.contains(word1) | context['value'].str.contains(word2)][ | |
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]] | |
line['paired'] = line.value + " " + line.value.shift(-1) | |
line['space'] = abs(line.x1.shift(-1) - line.x2) | |
line['align'] = (abs(line.y1.shift(-1) - line.y1) + | |
abs(line.y2.shift(-1) - line.y2)) / 2 | |
line["score"] = line["paired"].apply( | |
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2))) | |
line = line[line.score < 10] | |
line = line.sort_values(by=['score', 'space', "align"]) | |
return line | |
def get_lien(self): | |
""" | |
Finds information about 'Lien Types'. | |
:return: information about lien types | |
""" | |
line = self.get_lines("Lien", "Type:") | |
lien_list = [] | |
for i, row in line.iterrows(): | |
lien = self.words[(self.words.page_idx == row.page_idx) & ( | |
self.words.block_idx == row.block_idx)] | |
lien_dict = { | |
"Lien Type:": self.find_attribute_two("Lien", "Type:", context=lien), | |
"Filed Against:": self.find_attribute_two("Filed", "Against:", context=lien), | |
"Amount:": self.find_attribute_one("Amount:", context=lien), | |
"Recorded Date:": self.find_attribute_two("Recorded", "Date:", context=lien), | |
"Recording Information:": self.find_attribute_two("Recording", "Information:", context=lien), | |
"Comment:": self.find_attribute_one("Comment:", context=lien), | |
} | |
none = True | |
for k, v in lien_dict.items(): | |
if v is not None: | |
none = False | |
if not none: | |
lien_list.append(lien_dict) | |
return lien_list | |
def get_vesting_instrument(self): | |
""" | |
Finds information about 'Vesting Instrument Type' | |
:return: information about vesting instrument type | |
""" | |
line = self.get_lines("Vesting", "Instrument") | |
lien_list = [] | |
for i, row in line.iterrows(): | |
lien = self.words[(self.words.page_idx == row.page_idx) & ( | |
self.words.block_idx == row.block_idx)] | |
lien_dict = { | |
"Vesting Instrument Type": self.find_attribute_two("Vesting", "Instrument", context=lien), | |
"Executed": self.find_attribute_one("Executed:", context=lien), | |
"Recorded": self.find_attribute_one("Recorded:", context=lien), | |
"Recording Information": self.find_attribute_two("Recording", "Information:", context=lien), | |
"Comment": self.find_attribute_one("Comment:", context=lien), | |
} | |
none = True | |
for k, v in lien_dict.items(): | |
if v is not None: | |
none = False | |
if not none: | |
lien_list.append(lien_dict) | |
return lien_list | |
def get_instrument(self): | |
""" | |
Finds information about 'Instrument Type'. | |
:return: information about instrument type | |
""" | |
line = self.get_lines("Instrument", "Type:") | |
lien_list = [] | |
for i, row in line.iterrows(): | |
lien = self.words[(self.words.page_idx == row.page_idx) & ( | |
self.words.block_idx == row.block_idx)] | |
lien_dict = { | |
"Instrument Type:": self.find_attribute_two("Instrument", "Type:", context=lien), | |
"From:": self.find_attribute_one("From:", context=lien), | |
"To:": self.find_attribute_one("To:", context=lien), | |
"Executed:": self.find_attribute_one("Executed:", context=lien), | |
"Recorded:": self.find_attribute_one("Recorded:", context=lien), | |
"Recording Information:": self.find_attribute_two("Mortgage", "Recording", context=lien), | |
} | |
none = True | |
for k, v in lien_dict.items(): | |
if v is not None: | |
none = False | |
if not none: | |
lien_list.append(lien_dict) | |
return lien_list | |
def find_table_pages(self, word1, word2): | |
""" | |
Finds pages that can have 'Federal Tax Lien' tables. | |
:param word1: first word of two to match with any line of the document | |
:param word2: second word of two to match with any line of the document | |
:return: a pd.Dataframe with pages with high score of being the pages we are looking for | |
""" | |
try: | |
context = self.words | |
# word1, word2 = "Notice", "Lien" | |
line = context[context['value'].str.contains(word1) | context['value'].str.contains(word2)][ | |
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]] | |
line['paired'] = line.value + " " + line.value.shift(-1) | |
line['space'] = line.x1.shift(-1) - line.x2 | |
line['align'] = (abs(line.y1.shift(-1) - line.y1) + | |
(line.y2.shift(-1) - line.y2)) / 2 | |
line["score"] = line["paired"].apply( | |
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2))) | |
line = line[line.score < 10] | |
line = line.sort_values(by=['score', 'space', "align"]) | |
return line | |
except Exception as e: | |
return None | |
@staticmethod | |
def find_column_values(report, context, word1, word2, right=0.0, left=0.0, height=0.21): | |
""" | |
Finds values for a specific column for a specific table. | |
:param context: if there are any specific context to look at, to shorten the search space. | |
:param word1: first word of two to match with any line of the document | |
:param word2: second word of two to match with any line of the document | |
:param right: increase or decrease in right of the words for a column | |
:param left: increase or decrease in left of the words for a column | |
:param height: height of the column to consider | |
:return: a pd.Dataframe containing rows for the values of the column | |
""" | |
try: | |
# word1, word2 = "Kind", "Tax" | |
c = report.words[(report.words.page_idx == context.page_idx)] | |
column = c[c['value'].str.contains(word1) | c['value'].str.contains(word2)][ | |
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]] | |
column['paired'] = column.value + " " + column.value.shift(-1) | |
column['space'] = abs(column.x1.shift(-1) - column.x2) | |
column['align'] = ( | |
abs(column.y1.shift(-1) - column.y1) + (column.y2.shift(-1) - column.y2)) / 2 | |
column['x12'] = column.x1.shift(-1) | |
column['y12'] = column.y1.shift(-1) | |
column['x22'] = column.x2.shift(-1) | |
column['y22'] = column.y2.shift(-1) | |
column["score"] = column["paired"].apply( | |
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2))) | |
column = column[column.score < 10] | |
column = column.sort_values(by=['score', 'space', "align"]) | |
column_data = c[(c.x1 >= column.x1.item() - left) & (c.x2 <= | |
column.x22.item() + right) & (c.y1 - 0.01 >= column.y2.item())] | |
column_data['hd'] = abs( | |
column_data.y1.shift(-1) - column.y2.item()) | |
column_data = column_data.sort_values(by=['hd']) | |
column_data['bid'] = abs( | |
column_data.block_idx.shift(-1) - column_data.block_idx) | |
column_data['lid'] = abs( | |
column_data.line_idx.shift(-1) - column_data.line_idx) | |
column_data['h2d'] = abs(column_data.hd.shift(-1) - column_data.hd) | |
column_data = column_data.sort_values( | |
by=['hd', 'h2d', 'bid', "lid"]) | |
column_data = column_data.reset_index(drop=True) | |
values = [] | |
for i, k in column_data.iterrows(): | |
if context.block_idx == k.block_idx: | |
continue | |
# elif k.bid > 1 or k.lid > block_d or column_data.iloc[i + line_d].hd.item() > height: | |
# break | |
if k.hd > height or column_data.iloc[i + 1].h2d.item() > height: | |
break | |
else: | |
values.append(k) | |
return values | |
except Exception as e: | |
return None | |
def get_lien_tables(self): | |
""" | |
Finds all the value in a structure way for a table. | |
:return: list of tables, tables are a list of rows | |
""" | |
notice_lien = self.find_table_pages("Notice", "Lien") | |
lien_tables = [] | |
for i, table in notice_lien.iterrows(): | |
# table = notice_lien.iloc[0][["page_idx", "block_idx", "line_idx", "paired"]] | |
c = self.words[(self.words.page_idx == table.page_idx)] | |
kind_of_tax = self.find_column_values(table, "Kind", "Tax") | |
first_col = pd.DataFrame(kind_of_tax) | |
rows = [["Kind of Tax (a)", "Tax Period Ending (b)", "Identifying Number (c)", | |
"Date of Assessment", "Last Day for Refining (e)", "Unpaid Balance of Assessment (f)"]] | |
for k, v in first_col.iterrows(): | |
line_values = list(c[(c.y1 >= c[c.line_idx == v.line_idx].y1.min()) & ( | |
c.y2 <= c[c.line_idx == v.line_idx].y2.max())].value.values) | |
line_values.append(line_values[0]) | |
rows.append(line_values[1:]) | |
if len(rows) > 1: | |
lien_tables.append(rows) | |
return lien_tables |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment