Created
March 5, 2021 04:29
-
-
Save ganeshchand/f6c864fcfaf67fcedf6c848c4c63d69a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Author: github.com/ganeshchand | |
* Date: 03/04/2021 | |
* Specifying schema when reading different source format is mandatory or optional depending on which DataFrameReader you are using. | |
* spark.read() is a batch DataFrame reader | |
* spark.readStream() is a streaming DataFrame reader | |
* Let's write a quick test to test which reader enforces us to specify schema on read | |
*/ | |
// step1: Let's generate test dataset for csv, json, parquet, orc and delta | |
val basePath = "/ganesh/tmp/stream_format_test" | |
Seq("csv", "json", "parquet", "orc", "delta").foreach { format => | |
spark.range(10).write.format(format).save(s"$basePath/output/$format") | |
} | |
// step1: Let's read the test dataset using batch and streaming reader API | |
Seq("""batch: spark.read()""", """streaming: spark.readStream""").foreach { | |
readType => | |
println("\n") | |
Seq("csv", "json", "parquet", "orc", "delta") | |
.map { format => | |
val df = scala.util.Try { | |
readType match { | |
case s: String if s.startsWith("batch") => | |
spark.read | |
.format(format) | |
.option("inerSchema", true) | |
.load(s"$basePath/output/$format") | |
case s: String if s.startsWith("streaming") => | |
spark.readStream | |
.format(format) | |
.option("inerSchema", true) | |
.load(s"$basePath/output/$format") | |
} | |
}.toOption | |
(format, df) | |
} | |
.foreach { case (f, df) => | |
if (df.isDefined) | |
println(s"$readType for $f doesn't require schema") | |
else println(s"$readType for $f requires schema") | |
} | |
} | |
/* | |
batch: spark.read() for csv doesn't require schema | |
batch: spark.read() for json doesn't require schema | |
batch: spark.read() for parquet doesn't require schema | |
batch: spark.read() for orc doesn't require schema | |
batch: spark.read() for delta doesn't require schema | |
streaming: spark.readStream for csv requires schema | |
streaming: spark.readStream for json requires schema | |
streaming: spark.readStream for parquet requires schema | |
streaming: spark.readStream for orc requires schema | |
streaming: spark.readStream for delta doesn't require schema | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment