Skip to content

Instantly share code, notes, and snippets.

@sleroy
Created November 7, 2024 09:24
Show Gist options
  • Save sleroy/a04c2358f5fe408be11fffa23f6b04db to your computer and use it in GitHub Desktop.
Save sleroy/a04c2358f5fe408be11fffa23f6b04db to your computer and use it in GitHub Desktop.
Example how to use partitions to leveral several executors for reading and parallel writes using partition column key.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, spark_partition_id
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
num_partitions = 40
bulk_size = 10000
write_threads = 40
num_rows = 10000000
pk = "id"
## Read Data from a RDS DB using JDBC driver
connection_options = {
"useConnectionProperties": "true",
"dbtable": "new_table",
"connectionName": "source-sqlserver",
'hashexpression': pk,
'hashpartitions': num_partitions,
"bulkSize": str(bulk_size),
}
source_df = glueContext.create_dynamic_frame.from_options(
connection_type = "sqlserver",
connection_options = connection_options
)
# Limit the number of rows, repartition, and assign partition IDs
spark.sparkContext.setJobDescription("Limiting data, repartitioning, and assigning partition IDs")
df = source_df.toDF().limit(num_rows).repartition(write_threads, pk)
spark.sparkContext.setJobDescription("Writing into JDBC")
# JDBC connection properties
jdbc_url = "jdbc:sqlserver://"
connection_properties = {
"user": "example",
"password": "example",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
df.write.option("partitionColumn", pk).option("lowerBound", 0).option("upperBound", bulk_size).option("numPartitions", write_threads).option("batchsize", bulk_size) \
.jdbc(jdbc_url, "generated", mode="append", properties=connection_properties)
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment