Last active
May 18, 2021 08:03
-
-
Save etheleon/caa944b36077f83b7a448b9b03779216 to your computer and use it in GitHub Desktop.
accompanying gist for datalake article
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- ALTER TABLE schema.table DROP IF EXISTS PARTITION (year='2021', month='01', day='11', hour='01') | |
ALTER TABLE pricing.demand_tbl ADD | |
PARTITION (year='2021', month='01', day='11', hour='01') | |
LOCATION 's3://datascience-bucket/wesley.goi/data/pricing/demand_tbl/year=2021/month=01/day=11/hour=01' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MSCK REPAIR TABLE schema.table |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CACHE TABLE <tablename> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT /*+ REPARTITION(100), COALESCE(500), REPARTITION_BY_RANGE(3, c) */ * FROM t |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark._jsparkSession.catalog().tableExists(schema, table) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- DROP TABLE IF EXISTS schema.table | |
CREATE EXTERNAL TABLE IF NOT EXISTS pricing.demand_tbl | |
( | |
country_id bigint, | |
city_id bigint, | |
utcDate string, | |
user_id bigint | |
) | |
PARTITIONED BY (year string, month string, day string hour string) | |
LOCATION 's3a://datascience-bucket/wesley.goi/data/pricing/demand_tbl/' -- base path | |
STORED AS PARQUET |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark | |
from pyspark.sql import SparkSession | |
conf = ( | |
pyspark.SparkConf() | |
.set("spark.executor.instances", num_executors) | |
) | |
spark = ( | |
SparkSession.builder | |
.appName("my_app_name") | |
.config(conf=conf) | |
.enableHiveSupport() | |
.getOrCreate() | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(sparklyr) | |
conf <- spark_config() | |
conf$spark.executor.memory = "16G" | |
conf$spark.executor.instance = num_executors | |
spark <- spark_connect( | |
master = "local", | |
version = "2.3", | |
appName = "my_app_name" | |
conf = conf | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TEMPORARY VIEW my_table_name IF NOT EXISTS AS | |
<QUERY> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import functions as F | |
from pyspark.sql.types import FloatType | |
def some_func(a: int, b: int) -> int: | |
return a+b | |
# pyspark | |
udf_some_func = F.udf(some_func, FloatType()) | |
# in SQL | |
# SELECT *, UDF_SOME_FUNC(col_a, col_b) FROM table | |
spark.udf.register("udf_some_func", some_func, StringType()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"description": "[DESCRIPTION]", | |
"name": "[NAME]", | |
"type": "[TYPE]", | |
"mode": "[MODE]" | |
} | |
.... | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- spark2 | |
-- SELECT /*+ REPARTITION 3000 */ * | |
SELECT /*+ REPARTITION(2000, col1, col2, col3) */ * | |
FROM table |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Infer column data types from dataframe | |
## partition columns | |
partition_cols_dict = { | |
"year": "string", | |
"month": "string", | |
"day": "string", | |
"hour": "string" | |
} | |
partitions = ", ".join([f"{key} {partition_cols_dict[key]}" for key in partition_cols_dict]) | |
# print(partition_cols) | |
# year string, month string, day string, hour string | |
# non-partition columns | |
partition_cols = [key for key in partition_cols_dict] | |
columns = ", \n".join( | |
spark.sql("describe temp") | |
.rdd.filter(lambda row: row.col_name not in partition_cols) | |
.map(lambda row: row[0] + " " + row[1]) | |
.collect() | |
) | |
# print(columns) | |
# country_id bigint, city_id bigint, utcDate string, user_id bigint |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT * | |
FROM table | |
WHERE year||month||day BETWEEN 20210301 AND 20210415 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
df = spark.read.parquet("s3://<bucket>/<suffix>/year=*/month=*/day=*/hour=*") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
REFRESH TABLE pricing.demand_tbl | |
PARTITION (year='2021', month='01', day='11', hour='01') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from spark.sql import Dataframe | |
def save_to_tfrecord(df: DataFrame, path): | |
""" | |
Saves Spark dataframe to tfrecord in S3 | |
Parameter | |
------ | |
df: DataFrame | |
spark dataframe | |
path: string | |
file path, if it's S3 eg. | |
s3://<bucket>/some/path/tfrecord. | |
It'll save the part files under the this folder gzipped | |
""" | |
( | |
df.write | |
.format("tfrecords") | |
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") | |
.mode("overwrite") | |
.save(path) | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT * | |
FROM table | |
WHERE date BETWEEN 20210301 AND 20210415 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# enable AQE | |
spark.conf.set("spark.sql.adaptive.enabled", "true") | |
# enable shuffle partitions optimisation | |
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment