Skip to content

Instantly share code, notes, and snippets.

View 1ambda's full-sized avatar
๐Ÿฆ
in the jungle

Kun 1ambda

๐Ÿฆ
in the jungle
View GitHub Profile
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ...
namespace: ...
annotations:
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/load-balancer-name: ...
alb.ingress.kubernetes.io/scheme: internal
spark
.read // ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ต๋‹ˆ๋‹ค.
.format("jdbc") // "jdbc" ๋ฟ ์•„๋‹ˆ๋ผ "kafka" ๋“ฑ ๋‹ค์–‘ํ•œ Format ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค
.join(...) // ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ์™€ Join (๋ณ‘ํ•ฉ) ํ•ฉ๋‹ˆ๋‹ค.
.where(...) // ๋ฐ์ดํ„ฐ Row ํ•„ํ„ฐ๋งํ•˜๊ฑฐ๋‚˜
.selectExpr(...) // ํ•„์š”ํ•œ Column ๋งŒ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค.
repartition(5, "col1") // ์–ผ๋งˆ๋‚˜ / ์–ด๋–ค ๊ธฐ์ค€์œผ๋กœ ๋ถ„์‚ฐํ•ด ์ฒ˜๋ฆฌํ• ์ง€๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค
spark.executor.instances = 10
spark.executor.cores = 10
spark.executor.memory = 30g (GiB)
spark.memory.memoryOverhead = 0.1
spark.memory.fraction = 0.8
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = false
ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 33.2 GB of 33 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
dfInitial = spark.read(...)
dfFiltered = dfInitial.select(...).where(..).cache() # ์บ์‹œ ํ˜ธ์ถœ
dfJoined = (...)
# action ํ˜ธ์ถœ
# Transformation ์ด ์‹คํ–‰๋˜๋ฉฐ dfFiltered ๋ฅผ ๊ณ„์‚ฐํ›„
# dfFiltered ๋ฅผ ์—ฌ๋Ÿฌ๋Œ€ ๋‚˜๋ˆ ์ง„ Executor ์—์„œ ๋ฉ”๋ชจ๋ฆฌ์— ์บ์‹ฑ
dfJoined.write(...)
df = spark.read.(...)
df.createOrReplaceTempView("PROPERTY_META")
spark.sql("SELECT * FROM PROPERTY_META ..")
# repartition ์ด ์ปฌ๋Ÿผ ๊ธฐ์ค€ ์—†์ด ๋˜์—ˆ์œผ๋ฏ€๋กœ
# ๋™์ผํ•œ property_id (e.g., 2101) ๊ฐ€ ์—ฌ๋Ÿฌ Partition = Connection ์— ๋‚˜๋ˆ„์–ด Insert
df\
.repartition(10)\
.write\
.mode("append")\
.format("jdbc")\
.option("numPartitions", "10")
# property_id ๊ธฐ์ค€์œผ๋กœ Partition ์ด ๋‚˜๋‰˜๊ณ 
CREATE TABLE ...
(
...
PRIMARY KEY (property_id, part)
)
df = spark.read.csv(...)
dfSelected = df.selectExpr("...")
df.rdd.id
dfSelected.rdd.id
df = spark.read.csv(...)
dfSelected = df.selectExpr("...")