from pyspark import SparkConf
from pyspark .sql import SparkSession
from pyspark .sql import functions as func
from pyspark .sql .types import StructType , StructField , IntegerType , StringType
conf = SparkConf () \
.setMaster ('local' ) \
.setAppName ("app name here" )
spark = SparkSession .builder .config (conf = conf ).getOrCreate ()
sc = spark .sparkContext
Create DataFrame (test data)
my_rows = [Row ("123" , "04/05/2020" ), Row ("124" , "4/5/2020" ), Row ("125" , "04/5/2020" ), Row ("126" , "4/05/2020" )]
my_schema = StructType ([
StructField ("ID" , StringType ()),
StructField ("EventDate" , StringType ()),
])
my_rdd = spark .sparkContext .parallelize (my_rows , 2 )
my_df = spark .createDataFrame (my_rdd , my_schema )
Create DataFrame (plain from text)
From text file (.csv
, .data
), normal way:
DATASET_PATH = "/content/gdrive/path/to/text.file"
lines = sc .textFile (DATASET_PATH )
Parse lines + filter
def parseLine (line ):
fields = line .split (',' )
age = int (fields [2 ])
numFriends = int (fields [3 ])
return (age , numFriends )
rdd = lines .map (parseLine )
rdd = rdd .filter (lambda x : [condition ] )
Functions:
averagesByAge = rdd \
.mapValues (lambda x : (x , 1 )) \
.reduceByKey (lambda x , y : (x [0 ] + y [0 ], x [1 ] + y [1 ])) \
.mapValues (lambda x : x [0 ] / x [1 ])
results = averagesByAge .collect ()
for result in results :
print (result )
Create DataFrame (with struct)
CSV:
Infer schema
surveyDF = spark .read \
.option ("header" , "true" ) \
.option ("inferSchema" , "true" ) \
.csv (DATASET_PATH )
From schemaStruct
schemaStruct = StructType ([
StructField ("FL_DATE" , DateType ()),
StructField ("OP_CARRIER" , StringType ()),
...
])
print ('Schema by StructType' )
flightTimeCsvDF = spark .read \
.format ('csv' ) \
.option ("header" , "true" ) \
.schema (schemaStruct ) \
.option ("mode" , "FAILFAST" ) \
.option ("dateFormat" , "M/d/y" ) \
.load (DATASET_PATH )
from Schema DDL
flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT,
ORIGIN STRING, ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING,
CRS_DEP_TIME INT, DEP_TIME INT, WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT,
ARR_TIME INT, CANCELLED INT, DISTANCE INT"""
print ('Schema by String' )
flightTimeCsvDF = spark .read \
.format ('csv' ) \
.option ("header" , "true" ) \
.schema (flightSchemaDDL ) \
.option ("mode" , "FAILFAST" ) \
.option ("dateFormat" , "M/d/y" ) \
.load (DATASET_PATH )
Write out:
flightTimeCsvDF .write \
.format ("json" ) \
.mode ("overwrite" ) \
.option ("path" , "dataSink/json2/" ) \
.partitionBy ("OP_CARRIER" , "ORIGIN" ) \
.option ("maxRecordsPerFile" , 10000 ) \
.save ()
# Create temp view
surveyDF .createOrReplaceTempView ("survey_tbl" )
# Run query
countDF = spark .sql ("select Country, count(1) as count from survey_tbl where Age<40 group by Country" )
# Đọc dữ liệu từ Dataset
flightTimeParquetDF = spark .read \
.format ('parquet' ) \
.load (DATASET_PATH )
# Tạo Database "AIRLINE_DB" nếu chưa tồn tại
spark .sql ("CREATE DATABASE AIRLINE_DB" )
spark .catalog .setCurrentDatabase ("AIRLINE_DB" )
flightTimeParquetDF .write \
.mode ("overwrite" ) \
.saveAsTable ("flight_data_tbl" )
from pyspark .sql .functions import *
def to_date_df (df , dateformat , datefield ):
return df .withColumn (datefield , to_date (col (datefield ), dateformat ))
new_df = to_date_df (my_df , "M/d/y" , "EventDate" )
new_df .printSchema ()
file_df = spark .read .text (DATASET_PATH )
log_reg = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)'
logs_df = file_df .select (regexp_extract ('value' , log_reg , 1 ).alias ('ip' ),
regexp_extract ('value' , log_reg , 4 ).alias ('date' ),
regexp_extract ('value' , log_reg , 6 ).alias ('request' ),
regexp_extract ('value' , log_reg , 10 ).alias ('referrer' ))
logs_df .printSchema ()
logs_df \
.withColumn ("referrer" , substring_index ("referrer" , "/" , 3 )) \
.where ("trim(referrer) == '-' OR trim(referrer) != ''" ) \
.groupBy ("referrer" ) \
.count () \
.show (100 , truncate = False )
airlinesDF = spark .read \
.format ('csv' ) \
.option ("header" , "true" ) \
.option ("inferSchema" ,"true" ) \
.option ("samplingRatio" , "0.0001" ) \
.load (DATASET_PATH )
airlinesDF .select ("Origin" , "Dest" , "Distance" ).show (10 )
airlinesDF .select (column ("Origin" ), col ("Dest" ), airlinesDF .Distance ).show (10 )
airlinesDF .select ("Origin" , "Dest" , "Distance" , "Year" ,"Month" ,"DayofMonth" ).show (10 )
airlinesDF .selectExpr ("Origin" , "Dest" , "Distance" , "to_date(concat(Year,Month,DayofMonth), 'yyyyMMdd') as FlightDate" ).show (10 )
airlinesDF .select ("Origin" , "Dest" , "Distance" , to_date (concat ("Year" , "Month" , "DayofMonth" ), "yyyyMMdd" ).alias ("FlightDate" )).show (10 )
UDF (User-Defined Function)
survey_df = spark .read \
.option ("header" , "true" ) \
.option ("inferSchema" , "true" ) \
.csv (DATASET_PATH )
def parse_gender (gender ):
female_pattern = r"^f$|f.m|w.m"
male_pattern = r"^m$|ma|m.l"
if re .search (female_pattern , gender .lower ()):
return "Female"
elif re .search (male_pattern , gender .lower ()):
return "Male"
else :
return "Unknown"
# Play as function
parse_gender_udf = udf (parse_gender , StringType ())
survey_df2 = survey_df .withColumn ("Gender" , parse_gender_udf ("Gender" ))
# Register with SparkSQL
spark .udf .register ("parse_gender_udf" , parse_gender , StringType ())
survey_df3 = survey_df .withColumn ("Gender" , expr ("parse_gender_udf(Gender)" ))
Aggregation
invoice_df .select (
f .count ("*" ).alias ("Count *" ),
f .sum ("Quantity" ).alias ("TotalQuantity" ),
f .avg ("UnitPrice" ).alias ("AvgPrice" ),
f .countDistinct ("InvoiceNo" ).alias ("CountDistinct" )
).show ()
invoice_df .createOrReplaceTempView ("sales" )
summary_sql = spark .sql ("""
SELECT
Country,
InvoiceNo,
sum(Quantity) as TotalQuantity,
round(sum(UnitPrice * Quantity), 2) as InvoiceValue
FROM sales
Group BY Country, InvoiceNo
""" )
summary_sql .show ()
With GroupBy
summary_df = invoice_df \
.groupBy ("Country" , "InvoiceNo" ) \
.agg (
f .sum ("Quantity" ).alias ("TotalQuantity" ),
f .round (f .sum (f .expr ("UnitPrice * Quantity" )), 2 ).alias ("InvoiceValue" )
)
summary_df .show ()
running_total_window = Window .partitionBy ("Country" ) \
.orderBy ("WeekNumber" ) \
.rowsBetween (Window .unboundedPreceding , Window .currentRow )
summary_df .withColumn ("RunningTotal" ,
f .sum ("InvoiceValue" ).over (running_total_window )) \
.show ()
# Xác định điều kiện Join
join_expr = order_df .prod_id == product_df .prod_id
product_renamed_df = product_df .withColumnRenamed ("qty" , "reorder_qty" )
# Join hai Dataframe
order_df .join (product_renamed_df , join_expr , "inner" ) \
.drop (product_renamed_df .prod_id ) \
.select ("order_id" , "prod_id" , "prod_name" , "unit_price" , "list_price" , "qty" ) \
.show ()