Last active
June 4, 2018 22:07
-
-
Save RajaShyam/a033c2dd08d8e938147f9c081e52b7e2 to your computer and use it in GitHub Desktop.
pyspark exploration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Standalone function: | |
def _add_one(x): | |
"""Adds one""" | |
if x is not None: | |
return x + 1 | |
add_one = udf(_add_one, IntegerType()) | |
Importance: This allows for full control flow, including exception handling, but duplicates variables. | |
2. lambda expression: | |
===================== | |
add_one = udf(lambda x: x + 1 if x is not None else None, IntegerType()) | |
Importance: No variable duplication but only pure expressions. | |
3. Using nested functions: | |
========================== | |
def add_one(c): | |
def add_one_(x): | |
if x is not None: | |
return x + 1 | |
return udf(add_one_, IntegerType())(c) | |
Importance : Quite verbose but enables full control flow and clearly indicates expected number of arguments. | |
4. Using decorator: | |
=================== | |
@udf | |
def add_one(x): | |
"""Adds one""" | |
if x is not None: | |
return x + 1 | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Step 1: Define functions: | |
========================== | |
from pyspark.sql.functions import lit | |
def with_greeting(df): | |
return df.withColumn("greeting", lit("hi")) | |
def with_something(df, something): | |
return df.withColumn("something", lit(something)) | |
#Create Dataframe and use the above functions | |
data = [("jose", 1), ("li", 2), ("liz", 3)] | |
source_df = spark.createDataFrame(data, ["name", "age"]) | |
df1 = with_greeting(source_df) | |
actual_df = with_something(df1, "moo") | |
# Creating multiple variables gets especially ugly when 5+ transformations need to be run — you don’t want df1, df2, df3, df4, and df5 | |
Use Monkey patch and chain the functions: | |
========================================= | |
1. Customize dataframe's transform method | |
from pyspark.sql.dataframe import DataFrame | |
def transform(self, f): | |
return f(self) | |
DataFrame.transform = transform | |
actual_df = (source_df | |
.transform(lambda df: with_greeting(df)) | |
.transform(lambda df: with_something(df, "crazy"))) | |
print(actual_df.show()) | |
+----+---+--------+---------+ | |
|name|age|greeting|something| | |
+----+---+--------+---------+ | |
|jose| 1| hi| crazy| | |
| li| 2| hi| crazy| | |
| liz| 3| hi| crazy| | |
+----+---+--------+---------+ | |
Using functools.partial: | |
======================= | |
def with_asset(word, df): | |
return df.withColumn("asset", lit(word)) | |
from functools import partial | |
actual_df = (source_df | |
.transform(with_greeting) | |
.transform(partial(with_asset, "stocks"))) | |
print(actual_df.show()) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark | |
from pyspark.sql import SparkSession | |
import pandas as pd | |
from pyspark.sql.functions import udf | |
spark = SparkSession.builder.getOrCreate() | |
df_pd = pd.DataFrame( | |
data={'integers': [1, 2, 3], | |
'floats': [-1.0, 0.5, 2.7], | |
'integer_arrays': [[1, 2], [3, 4, 5], [6, 7, 8, 9]]} | |
) | |
df = spark.createDataFrame(df_pd) | |
# Declare function | |
def square(x): | |
return x**2 | |
# Registering UDF with integer type output | |
from pyspark.sql.types import IntegerType | |
square_udf_int = udf(lambda z: square(z), IntegerType()) | |
df.select('integers', | |
'floats', | |
square_udf_int('integers').alias('int_squared')).show() | |
# float type output | |
from pyspark.sql.types import FloatType | |
square_udf_float = udf(lambda z: square(z), FloatType()) | |
df.select('integers', | |
'floats', | |
square_udf_float('integers').alias('int_squared'), | |
square_udf_float('floats').alias('float_squared')).show() | |
## Force the output to be float | |
def square_float(x): | |
return float(x**2) | |
square_udf_float2 = udf(lambda z: square_float(z), FloatType()) | |
Composite UDF: | |
=============== | |
def square_list(x): | |
return [float(val)**2 for val in x] | |
from pyspark.sql.types import ArrayType | |
square_list_udf = udf(lambda y: square_list(y), ArrayType(FloatType())) | |
df.select('integer_arrays', square_list_udf('integer_arrays')).show() | |
Using Struct Type: | |
================== | |
from pyspark.sql.types import StructType, StructField, StringType | |
array_schema = StructType([ | |
StructField('number', IntegerType(), nullable=False), | |
StructField('letters', StringType(), nullable=False) | |
]) | |
import string | |
def convert_ascii(number): | |
return [number, string.ascii_letters[number]] | |
convert_ascii(1) # validate functions | |
spark_convert_ascii = udf(lambda z: convert_ascii(z), array_schema) | |
df_ascii = df.select('integers', spark_convert_ascii('integers').alias('ascii_map')) | |
df_ascii.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment