Skip to content

Instantly share code, notes, and snippets.

@marhan
Created June 2, 2019 12:54
Show Gist options
  • Save marhan/b943d3c4aca05f57b9613eed06fd54ef to your computer and use it in GitHub Desktop.
Save marhan/b943d3c4aca05f57b9613eed06fd54ef to your computer and use it in GitHub Desktop.
Spark Scala HASPA CSV Import
val filesPath = "/Volumes/Volume/bank-account-files/*.csv"
import org.apache.spark.sql.types.{StructType, StructField, StringType, DecimalType};
val customSchema = StructType( Array( StructField("Buchung", StringType, true),
StructField("Wert", StringType, true),
StructField("Verwendungszweck", StringType, true),
StructField("Betrag", StringType, true) ))
val df = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").
option("delimiter",";").
option("charset", "Windows-1252").
option("treatEmptyValuesAsNulls", "True").
option("mode", "DROPMALFORMED").
schema(customSchema).load(filesPath)
val account = df.withColumn("Betrag", regexp_replace( regexp_replace( df("Betrag"), "\\.","" ) , "\\,",".").cast(DecimalType(10,2)))
.withColumn("Buchung", to_date( unix_timestamp( col("Buchung") , "dd.MM.yyyy" ).cast("timestamp") ) )
.withColumn("Wert", to_date( unix_timestamp( col("Wert") , "dd.MM.yyyy" ).cast("timestamp") ) )
account.registerTempTable("account")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment