Skip to content

Instantly share code, notes, and snippets.

@yarax
Created August 24, 2022 15:43
Show Gist options
  • Save yarax/2e190a3a1fcb95a740d85f1ccfbf972e to your computer and use it in GitHub Desktop.
Save yarax/2e190a3a1fcb95a740d85f1ccfbf972e to your computer and use it in GitHub Desktop.
pyspark_partitions_udf_glue
# Spark dependencies
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.context import SparkContext
# Glue dependecies
from awsglue.dynamicframe import DynamicFrame
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from pyspark.sql.types import *
spark = SparkSession.builder.master("local[1]").appName("SparkPartitions").getOrCreate()
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Reading some data with Spark from s3
df = spark.read.option("header","true").csv("s3a://aws-stocks-dataset/AAL.csv")
# Let's you UDF to extract month from date using Python
@udf(returnType=StringType())
def myfunction(str):
return str.split("-")[1]
monthUDF = udf(lambda v: myfunction(v),StringType())
df.withColumn("Month", monthUDF(col("Date"))).show()
# Partitions
# Getting the default number of partitions at the moment
df.rdd.getNumPartitions() # 1
# Group by Date and see performance
# max function need integer as input, so we cast strings to int
df = df.withColumn("high_int", col("High").cast("int"))
%time df.groupBy("Date").max("high_int").show()
# Let's see how many dates we have
df.select('Date').distinct().count() # 50
# Now let's create partition for every date
df.repartition(50, "Date")
# Adding a column with spark_partition_id
df = df.withColumn("parition", spark_partition_id())
df.show()
# Let's group again and compare the performance. On the big datasets after partitioning the performance should be better. On small datasets, because of shuffling worse.
%time df.groupBy("Date").max("high_int").show()
# Now let's partition data with Glue when we save it to S3
# Glue uses Hive Partitions on the folder level, that you can later see in the bucket by yourself
ddf = DynamicFrame.fromDF(df, glueContext, "ddf")
glueContext.write_dynamic_frame.from_options(ddf, connection_type="s3", connection_options={"path": "s3://aws-stocks-dataset-output/iamthebestdataengineer", "partitionKeys": ["Date"]}, format="parquet")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment