Skip to content

Instantly share code, notes, and snippets.

@adrianulbona
Last active July 24, 2019 12:40
Show Gist options
  • Save adrianulbona/b1372130f334e0e4ebfb1608b3339d49 to your computer and use it in GitHub Desktop.
Save adrianulbona/b1372130f334e0e4ebfb1608b3339d49 to your computer and use it in GitHub Desktop.
// 1) the story starts with more than 500 mil. compressed XML files stored in S3 (various sizes: 10KB - 200MB)
// 2) the "small" files are grouped, decompressed, cleaned and stored as Parquet files
// 3) the next piece of code converts the blob column to a column with a complex schema (more or less equivalent with the XML structure)
// 4) the result can be persisted and afterwards queried in an efficient way
case class OrderReference(ID: String,
SalesOrderID: String,
UUID: String,
IssueDate: String)
case class PartyName(Name: String)
case class AddressLine(Line: String)
case class Country(IdentificationCode: String)
case class PostalAddress(StreetName: String,
BuildingName: String,
BuildingNumber: String,
CityName: String,
PostalZone: String,
CountrySubentity: String,
AddressLine: AddressLine,
Country: Country)
case class TaxScheme(ID: String,
TaxTypeCode: String)
case class PartyTaxScheme(RegistrationName: String,
CompanyID: String,
ExemptionReason: String,
TaxScheme: TaxScheme)
case class Contact(Name: String,
Telephone: String,
Telefax: String,
ElectronicMail: String)
case class Party(PartyName: PartyName,
PostalAddress: PostalAddress,
PartyTaxScheme: PartyTaxScheme,
Contact: Contact)
case class AccountingSupplierParty(CustomerAssignedAccountID: String,
Party: Party)
case class AccountingCustomerParty(CustomerAssignedAccountID: String,
SupplierAssignedAccountID: String,
Party: Party)
case class Delivery(ActualDeliveryDate: String,
ActualDeliveryTime: String,
DeliveryAddress: PostalAddress)
case class FinancialInstitution(ID: String,
Name: String,
Address: PostalAddress)
case class FinancialInstitutionBranch(ID: String,
Name: String,
FinancialInstitution: FinancialInstitution,
Address: PostalAddress)
case class PayeeFinancialAccount(ID: String,
Name: String,
AccountTypeCode: String,
CurrencyCode: String,
FinancialInstitutionBranch: FinancialInstitutionBranch,
Country: Country)
case class PaymentMeans(PaymentMeansCode: String,
PaymentDueDate: String,
PayeeFinancialAccount: PayeeFinancialAccount)
case class PaymentTerms(Note: String)
case class Amount(_currencyID: String,
_VALUE: Double)
case class AllowanceCharge(ChargeIndicator: String,
AllowanceChargeReasonCode: String,
MultiplierFactorNumeric: String,
Amount: Amount)
case class TaxCategory(ID: String,
TaxScheme: TaxScheme)
case class TaxSubtotal(TaxableAmount: Amount,
TaxAmount: Amount,
TaxCategory: TaxCategory)
case class TaxTotal(TaxAmount: Amount,
TaxEvidenceIndicator: String,
TaxSubtotal: TaxSubtotal)
case class LegalMonetaryTotal(LineExtensionAmount: Amount,
TaxExclusiveAmount: Amount,
AllowanceTotalAmount: Amount,
PayableAmount: Amount)
case class InvoicedQuantity(_unitCode: String,
_VALUE: String)
case class OrderLineReference(LineID: String,
SalesOrderLineID: String,
LineStatusCode: String,
OrderReference: OrderReference)
case class BuyersItemIdentification(ID: String,
_schemeID: String)
case class LotIdentification(LotNumberID: String, ExpiryDate: String)
case class ItemInstance(LotIdentification: LotIdentification)
case class Item(Description: String,
Name: String,
BuyersItemIdentification: BuyersItemIdentification,
SellersItemIdentification: BuyersItemIdentification,
ItemInstance: ItemInstance,
CommodityClassification: CommodityClassification)
case class CommodityClassification(CommodityCode: CommodityCode,
ItemClassificationCode: ItemClassificationCode)
case class ItemClassificationCode(_VALUE: String,
_listID: String,
_listVersionID: String)
case class CommodityCode(_VALUE: String,
_listAgencyID: String,
_listVersionID: String)
case class Price(PriceAmount: Amount,
BaseQuantity: InvoicedQuantity)
case class InvoiceLine(ID: String,
InvoicedQuantity: InvoicedQuantity,
LineExtensionAmount: Amount,
OrderLineReference: OrderLineReference,
AccountingCost: String,
TaxTotal: TaxTotal,
Item: Item,
Price: Price)
case class Invoice(UBLVersionID: String,
CustomizationID: String,
ProfileID: String,
ID: String,
CopyIndicator: String,
UUID: String,
IssueDate: String,
InvoiceTypeCode: String,
Note: String,
TaxPointDate: String,
OrderReference: OrderReference,
AccountingSupplierParty: AccountingSupplierParty,
AccountingCustomerParty: AccountingCustomerParty,
Delivery: Delivery,
PaymentMeans: PaymentMeans,
PaymentTerms: PaymentTerms,
AllowanceCharge: Seq[AllowanceCharge],
TaxTotal: Seq[TaxTotal],
LegalMonetaryTotal: LegalMonetaryTotal,
InvoiceLine: Seq[InvoiceLine])
val invoiceBlobsDF = spark.read.parquet(...)
val invoiceSchema = Encoders.product[Invoice].schema
val parserUDF = StaxXmlParser.udf(invoiceSchema, Map("rowTag" -> "Invoice"))
invoiceBlobsDF
.select("content_struct", parserUDF($"blob"))
.write
.parquet(...)
/**
* The Databricks XML parser is reading single files or consolidated XML content (one document per line).
*
* The snippet adapts the original parse() method so that I can be wrapped in Spark UDF.
*
* Source: https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala
*
*/
object StaxXmlParser extends Serializable {
private val logger = LoggerFactory.getLogger(StaxXmlParser.getClass)
def udf(schema: StructType, xmlOptions: Map[String, String]): UserDefinedFunction = {
val options = new XmlOptions(xmlOptions)
udf({
bytes: Array[Byte] => parse(new String(bytes), schema, options)
}, schema)
}
def parse(xml: String,
schema: StructType,
options: XmlOptions): Option[Row] = {
// The logic below is borrowed from Apache Spark's FailureSafeParser.
val corruptFieldIndex = Try(schema.fieldIndex(options.columnNameOfCorruptRecord)).toOption
val actualSchema = StructType(schema.filterNot(_.name == options.columnNameOfCorruptRecord))
val resultRow = new Array[Any](schema.length)
val toResultRow: (Option[Row], String) => Row = (row, badRecord) => {
var i = 0
while (i < actualSchema.length) {
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i)).orNull
i += 1
}
corruptFieldIndex.foreach(index => resultRow(index) = badRecord)
Row.fromSeq(resultRow)
}
def failedRecord(
record: String, cause: Throwable = null, partialResult: Option[Row] = None): Option[Row] = {
// create a row even if no corrupt record column is present
options.parseMode match {
case FailFastMode =>
throw new IllegalArgumentException(
s"Malformed line in FAILFAST mode: ${record.replaceAll("\n", "")}", cause)
case DropMalformedMode =>
val reason = if (cause != null) s"Reason: ${cause.getMessage}" else ""
logger.warn(s"Dropping malformed line: ${record.replaceAll("\n", "")}. $reason")
logger.debug("Malformed line cause:", cause)
None
case PermissiveMode =>
logger.debug("Malformed line cause:", cause)
Some(toResultRow(partialResult, record))
}
}
val factory = XMLInputFactory.newInstance()
factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, true)
factory.setProperty(XMLInputFactory.IS_COALESCING, true)
val filter = new EventFilter {
override def accept(event: XMLEvent): Boolean =
// Ignore comments. This library does not treat comments.
event.getEventType != XMLStreamConstants.COMMENT
}
// It does not have to skip for white space, since `XmlInputFormat`
// always finds the root tag without a heading space.
val eventReader = factory.createXMLEventReader(new StringReader(xml))
val parser = factory.createFilteredReader(eventReader, filter)
try {
val rootEvent = skipUntil(parser, XMLStreamConstants.START_ELEMENT)
val rootAttributes =
rootEvent.asStartElement.getAttributes.asScala.map(_.asInstanceOf[Attribute]).toArray
Some(convertObject(parser, schema, options, rootAttributes))
.orElse(failedRecord(xml))
} catch {
case e: PartialResultException =>
failedRecord(xml, e.cause, Some(e.partialResult))
case NonFatal(e) =>
failedRecord(xml, e)
}
}
...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment