Created
November 14, 2024 13:33
-
-
Save Streamweaver/0144a6e76699f420ccae52053f074b9b to your computer and use it in GitHub Desktop.
A common robust XML Output Parser example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from langchain_core.output_parsers.transform import BaseTransformOutputParser | |
from langchain_core.exceptions import OutputParserException | |
import xml.etree.ElementTree as ET | |
from typing import Union, Optional, Any | |
import re | |
from bs4 import BeautifulSoup | |
import logging | |
class RobustXMLOutputParser(BaseTransformOutputParser): | |
"""A more robust XML parser that handles malformed XML gracefully.""" | |
def __init__(self, tags: Optional[list[str]] = None, | |
parser: Literal["defusedxml", "xml"] = "defusedxml", | |
fallback_to_text: bool = True, | |
auto_repair: bool = True): | |
""" | |
Initialize the parser with error handling options. | |
Args: | |
tags: Optional list of expected XML tags | |
parser: XML parser to use ('defusedxml' or 'xml') | |
fallback_to_text: Whether to return raw text if parsing fails | |
auto_repair: Whether to attempt to repair malformed XML | |
""" | |
super().__init__() | |
self.tags = tags | |
self.parser = parser | |
self.fallback_to_text = fallback_to_text | |
self.auto_repair = auto_repair | |
self.logger = logging.getLogger(__name__) | |
def _clean_xml(self, text: str) -> str: | |
"""Clean and potentially repair malformed XML.""" | |
# Remove any XML declaration as it often causes issues | |
text = re.sub(r'<\?xml[^>]+\?>', '', text) | |
# Remove invalid XML characters | |
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) | |
if self.auto_repair: | |
# Use BeautifulSoup to attempt repair | |
soup = BeautifulSoup(text, 'xml') | |
return str(soup) | |
return text.strip() | |
def _validate_xml_structure(self, text: str) -> tuple[bool, str]: | |
"""Validate XML structure and return status and cleaned text.""" | |
try: | |
# Check if we have complete XML tags | |
if not re.search(r'<[^>]+>', text): | |
return False, "No valid XML tags found" | |
# Check for basic well-formedness | |
ET.fromstring(text) | |
return True, "" | |
except ET.ParseError as e: | |
return False, str(e) | |
def parse(self, text: str) -> dict[str, Union[str, list[Any]]]: | |
""" | |
Parse the output with enhanced error handling. | |
Args: | |
text: The XML text to parse | |
Returns: | |
Parsed dictionary or original text if parsing fails | |
Raises: | |
OutputParserException: If parsing fails and fallback_to_text is False | |
""" | |
# Try to find XML within triple backticks | |
match = re.search(r'```(?:xml)?(.*?)```', text, re.DOTALL) | |
if match: | |
text = match.group(1) | |
# Clean and potentially repair the XML | |
cleaned_text = self._clean_xml(text) | |
# Validate the structure | |
is_valid, error_msg = self._validate_xml_structure(cleaned_text) | |
if not is_valid: | |
self.logger.warning(f"XML validation failed: {error_msg}") | |
if self.fallback_to_text: | |
return {"raw_text": text} | |
else: | |
raise OutputParserException( | |
f"Failed to parse XML: {error_msg}", | |
llm_output=text | |
) | |
try: | |
if self.parser == "defusedxml": | |
from defusedxml import ElementTree as DefusedET | |
root = DefusedET.fromstring(cleaned_text) | |
else: | |
root = ET.fromstring(cleaned_text) | |
return self._root_to_dict(root) | |
except Exception as e: | |
self.logger.error(f"Parsing error: {str(e)}") | |
if self.fallback_to_text: | |
return {"raw_text": text} | |
raise OutputParserException( | |
f"Failed to parse XML: {str(e)}", | |
llm_output=text | |
) | |
def _root_to_dict(self, root: ET.Element) -> dict[str, Union[str, list[Any]]]: | |
"""Convert XML tree to Python dictionary with enhanced error handling.""" | |
try: | |
if root.text and bool(re.search(r"\S", root.text)): | |
return {root.tag: root.text} | |
result: dict = {root.tag: []} | |
for child in root: | |
if len(child) == 0: | |
result[root.tag].append({child.tag: child.text or ""}) | |
else: | |
result[root.tag].append(self._root_to_dict(child)) | |
return result | |
except Exception as e: | |
self.logger.error(f"Error converting XML to dict: {str(e)}") | |
return {root.tag: "Error processing this element"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment