Created
March 23, 2025 02:11
-
-
Save AshtonIzmev/42b8dedf1a8200d44da259b831c605bd to your computer and use it in GitHub Desktop.
Extract text data from common types of files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import enum | |
| class MimeType(str, enum.Enum): | |
| # Text documents | |
| TXT = "text/plain" | |
| PDF = "application/pdf" | |
| DOCX = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| XLSX = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| PPTX = "application/vnd.openxmlformats-officedocument.presentationml.presentation" | |
| # Images | |
| PNG = "image/png" | |
| JPG = "image/jpeg" | |
| JPEG = "image/jpeg" | |
| # Audio | |
| WAV = "audio/wav" | |
| MP3 = "audio/mpeg" | |
| WEBM_AUDIO = "audio/webm" | |
| OGG = "audio/ogg" | |
| OPUS = "audio/opus" | |
| AAC = "audio/aac" | |
| # Other | |
| XML = "application/xml" | |
| CSV = "text/csv" | |
| HTML = "text/html" | |
| @classmethod | |
| def is_audio(cls, mime_type): | |
| return mime_type.startswith("audio/") | |
| @classmethod | |
| def is_image(cls, mime_type): | |
| return mime_type.startswith("image/") | |
| @classmethod | |
| def is_text(cls, mime_type): | |
| return mime_type.startswith("text/") or mime_type in [cls.PDF, cls.DOCX] | |
| import os | |
| from typing import Optional | |
| from docx import Document | |
| from bs4 import BeautifulSoup | |
| import PyPDF2 | |
| import pandas as pd | |
| from pptx import Presentation | |
| from openai import OpenAI | |
| from mistralai import Mistral | |
| from deepgram import DeepgramClient, PrerecordedOptions, FileSource | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) | |
| def get_mistral_ocr(file_path: str, filetype: str): | |
| uploaded_file = mistral_client.files.upload( | |
| file={ | |
| "file_name": file_path, | |
| "content": open(file_path, "rb"), | |
| }, | |
| purpose="ocr" | |
| ) | |
| doc_type = "document_url" if filetype == "pdf" else "image_url" | |
| signed_url = mistral_client.files.get_signed_url(file_id=uploaded_file.id) | |
| ocr_response = mistral_client.ocr.process( | |
| model="mistral-ocr-latest", | |
| document={ | |
| "type": doc_type, | |
| doc_type: signed_url.url, | |
| } | |
| ) | |
| return "\n".join([p.markdown for p in ocr_response.pages]) | |
| deepgram_client = DeepgramClient(api_key=os.getenv("DEEPGRAM_API_KEY")) | |
| def get_deepgram_stt_EN(file_path: str) -> str: | |
| options = PrerecordedOptions(model="nova-2", smart_format=True, language="en") | |
| with open(file_path, "rb") as audio: | |
| buffer_data = audio.read() | |
| payload: FileSource = { "buffer": buffer_data } | |
| # Using rest instead of prerecorded as per deprecation warning | |
| response = deepgram_client.listen.rest.v("1").transcribe_file(payload, options) | |
| transcript = response.results.channels[0].alternatives[0].transcript | |
| return transcript | |
| openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| def get_openai_stt(file_path: str) -> str: | |
| audio_file= open(file_path, "rb") | |
| transcription = openai_client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file | |
| ) | |
| return transcription.text | |
| def txt_to_text(filename: str) -> str: | |
| """Extract text from a plain text file.""" | |
| with open(filename, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| def pdf_to_text(filename: str) -> str: | |
| """Extract text from a PDF file. | |
| If the extracted text is less than 100 characters per page, | |
| we'll need to use OCR (to be implemented later). | |
| """ | |
| with open(filename, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| num_pages = len(reader.pages) | |
| full_text = [] | |
| needs_ocr = False | |
| for i in range(num_pages): | |
| page = reader.pages[i] | |
| text = page.extract_text() | |
| # Check if text extraction yielded sufficient content | |
| if text and len(text) < 100: | |
| needs_ocr = True | |
| full_text.append(text or "") | |
| extracted_text = "\n".join(full_text) | |
| if needs_ocr: | |
| # Use OCR service to extract text from the PDF | |
| return get_mistral_ocr(filename, "pdf") | |
| return extracted_text | |
| def docx_to_text(filename: str) -> str: | |
| """Extract text from a DOCX file.""" | |
| doc = Document(filename) | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| full_text.append(para.text) | |
| return '\n'.join(full_text) | |
| def xlsx_to_text(filename: str) -> str: | |
| """Extract text from an XLSX file.""" | |
| # Load the Excel file | |
| workbook = pd.ExcelFile(filename) | |
| # Extract text from all sheets | |
| full_text = [] | |
| for sheet_name in workbook.sheet_names: | |
| df = pd.read_excel(workbook, sheet_name) | |
| # Add sheet name as header | |
| full_text.append(f"Sheet: {sheet_name}") | |
| # Convert dataframe to string representation | |
| sheet_text = df.to_string(index=False) | |
| full_text.append(sheet_text) | |
| full_text.append("") # Empty line between sheets | |
| return "\n".join(full_text) | |
| def pptx_to_text(filename: str) -> str: | |
| """Extract text from a PPTX file.""" | |
| presentation = Presentation(filename) | |
| full_text = [] | |
| # Extract text from each slide | |
| for slide in presentation.slides: | |
| slide_text = [] | |
| # Extract text from shapes in the slide | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text: | |
| slide_text.append(shape.text) | |
| # Add slide content to full text | |
| if slide_text: | |
| full_text.append("\n".join(slide_text)) | |
| full_text.append("") # Empty line between slides | |
| return "\n".join(full_text) | |
| def image_to_text(filename: str) -> str: | |
| """Extract text from an image using OCR.""" | |
| return get_mistral_ocr(filename, "image") | |
| def audio_to_text(filename: str) -> str: | |
| return get_openai_stt(filename) | |
| def html_to_text(filename: str) -> str: | |
| """Extract text from an HTML file.""" | |
| with open(filename, 'r', encoding='utf-8') as file: | |
| soup = BeautifulSoup(file, 'html.parser') | |
| # Remove script and style elements | |
| for script_or_style in soup(["script", "style"]): | |
| script_or_style.extract() | |
| # Get text | |
| text = soup.get_text() | |
| # Break into lines and remove leading and trailing space on each | |
| lines = (line.strip() for line in text.splitlines()) | |
| # Break multi-headlines into a line each | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| # Drop blank lines | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text | |
| def xml_to_text(filename: str) -> str: | |
| """Extract text from an XML file.""" | |
| with open(filename, 'r', encoding='utf-8') as file: | |
| soup = BeautifulSoup(file, 'xml') | |
| # Get text content from all elements | |
| text = soup.get_text(separator='\n', strip=True) | |
| # Remove excessive whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| text = '\n'.join(line for line in lines if line) | |
| return text | |
| def csv_to_text(filename: str) -> str: | |
| """Extract text from a CSV file.""" | |
| with open(filename, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| def filename_to_text(filename: str) -> Optional[str]: | |
| """ | |
| Convert a file to text based on its extension/mime type. | |
| Args: | |
| filename: Path to the file | |
| Returns: | |
| Extracted text or None if the file type is not supported | |
| """ | |
| _, ext = os.path.splitext(filename) | |
| ext = ext.lower().lstrip('.') | |
| # Map extensions to mime types | |
| ext_to_mime = { | |
| 'txt': MimeType.TXT, | |
| 'pdf': MimeType.PDF, | |
| 'docx': MimeType.DOCX, | |
| 'xlsx': MimeType.XLSX, | |
| 'pptx': MimeType.PPTX, | |
| 'png': MimeType.PNG, | |
| 'jpg': MimeType.JPG, | |
| 'jpeg': MimeType.JPEG, | |
| 'wav': MimeType.WAV, | |
| 'mp3': MimeType.MP3, | |
| 'webm': MimeType.WEBM_AUDIO, | |
| 'xml': MimeType.XML, | |
| 'csv': MimeType.CSV, | |
| 'html': MimeType.HTML, | |
| } | |
| mime_type = ext_to_mime.get(ext) | |
| if not mime_type: | |
| return None | |
| # Process based on mime type | |
| if mime_type == MimeType.TXT: | |
| return txt_to_text(filename) | |
| elif mime_type == MimeType.PDF: | |
| return pdf_to_text(filename) | |
| elif mime_type == MimeType.DOCX: | |
| return docx_to_text(filename) | |
| elif mime_type == MimeType.XLSX: | |
| return xlsx_to_text(filename) | |
| elif mime_type == MimeType.PPTX: | |
| return pptx_to_text(filename) | |
| elif MimeType.is_image(mime_type): | |
| return image_to_text(filename) | |
| elif MimeType.is_audio(mime_type): | |
| return audio_to_text(filename) | |
| elif mime_type == MimeType.HTML: | |
| return html_to_text(filename) | |
| elif mime_type == MimeType.XML: | |
| return xml_to_text(filename) | |
| elif mime_type == MimeType.CSV: | |
| return csv_to_text(filename) | |
| else: | |
| return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment