Created
January 15, 2018 10:35
-
-
Save janfait/921829b8b279dbaa8c579c600be2adb2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################# | |
# SETUP | |
################# | |
from pyspark import SparkConf, SparkContext | |
from pyspark.sql import SQLContext | |
from pyspark.sql import Row | |
from pyspark.sql.functions import lit | |
from pyspark.sql.functions import col, avg | |
from pyspark.sql.types import * | |
import csv, json, re, sys, argparse, time | |
import pandas as pd | |
import numpy as np | |
from ua_parser import user_agent_parser | |
################# | |
# GLOBAL VARIABLES | |
################# | |
APP_NAME = "UA Parser" | |
################# | |
# FUNCTIONS | |
################# | |
#define the parsing function | |
def parseUa(line): | |
#position of UA in the string, how many spaces away from the start the UA begins | |
uaPos = 11 | |
#position of the URL in the string, how many spaces aways fromt he start the URL begins | |
urlPos = 10 | |
try: | |
url = line.split(" ",urlPos)[urlPos].split(" ",1)[0] | |
except: | |
raise Exception("Found error at line: " + line) | |
#regex replace the protocol strings | |
url = re.sub(r"https://|http://|sslh.teradatadmc.com/|sslg.teradatadmc.com/","",url) | |
#split by slash and pick the first element | |
url = url.split("/")[0] | |
#split by the xth delimiter | |
uaSplit = line.split(" ",uaPos)[uaPos].rsplit(" ",2) | |
#extract the ua string at position 0 | |
uaString = uaSplit[0] | |
#have it parsed | |
uaStringParsed = user_agent_parser.Parse(uaString) | |
#extract the individual components of the json string and convert them to array | |
uaStringDevice = json.dumps(uaStringParsed['device']['family']) | |
uaStringOs = json.dumps(uaStringParsed['os']['family']) | |
uaStringUaFamily = json.dumps(uaStringParsed['user_agent']['family']) | |
uaStringUaMajor = json.dumps(uaStringParsed['user_agent']['major']) | |
uaStringUaMinor = json.dumps(uaStringParsed['user_agent']['minor']) | |
#bind all the individual pieces of information into one list | |
uaList = [url,uaStringDevice,uaStringOs,uaStringUaFamily,uaStringUaMajor,uaStringUaMinor] | |
uaString = ','.join(uaList) | |
#return | |
return uaString | |
################# | |
# MAIN | |
################# | |
def main(sc,fileIn,fileOut,fileFormat): | |
sqlContext = SQLContext(sc) | |
#read in file | |
uaLines = sc.textFile(fileIn) | |
#extract header | |
header = uaLines.take(1)[0] | |
#filter out header | |
uaLines = uaLines.filter(lambda line: line != header) | |
#map each line using the parsing function | |
parsedUaLines = uaLines.map(parseUa) | |
#split the resulting strings by a separator | |
uaRdd = parsedUaLines.map(lambda l: l.split(",")) | |
#define the schema string | |
schemaString = "url device os family major minor" | |
#all fields will be strings | |
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | |
#create the actual schema | |
schema = StructType(fields) | |
# Apply the schema to the RDD. | |
schemaUa = sqlContext.createDataFrame(uaRdd, schema) | |
schemaUa = schemaUa.coalesce(1) | |
#write | |
if fileFormat=="json": | |
schemaUa.write.json(fileOut) | |
else: | |
schemaUa.write.format('com.databricks.spark.csv').options(header='true').save(fileOut) | |
################# | |
# EXECUTE MAIN | |
################# | |
if __name__ == "__main__": | |
#configure the spark context | |
sparkConfiguration = SparkConf().setAppName(APP_NAME) | |
sparkContext = SparkContext(conf=sparkConfiguration) | |
#define infile name | |
inFileName = "/data/product/all_062016_startpage5m.csv" | |
#parse filename | |
inFileNameSplit = inFileName.rsplit("/") | |
inFileNameCluster = inFileNameSplit[len(inFileNameSplit)-1].split("_")[0] | |
inFileNameTime = inFileNameSplit[len(inFileNameSplit)-1].split("_")[1] | |
inFileNamePage = inFileNameSplit[len(inFileNameSplit)-1].split("_")[2] | |
inFileNamePage = re.sub(r".csv","",inFileNamePage) | |
#outname just extended with _parsed | |
outFileName = inFileName[0:(len(inFileName)-4)] + '_parsed' | |
outFormat = "csv" | |
# Execute Main functionality | |
main(sparkContext,inFileName,outFileName,outFormat) | |
# /spark/bin/spark-submit --master local[4] --packages com.databricks:spark-csv_2.10:1.2.0 /home/user/scripts/py_spark_uaparser.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment