Last active
August 31, 2018 13:24
-
-
Save wagnerjgoncalves/d2622d291a2a63681f38aa60327bf9a8 to your computer and use it in GitHub Desktop.
Pyspark using SparkContext example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Example of Python RDD with SparkContext | |
""" | |
import csv | |
from pyspark import SparkContext | |
from pyspark.conf import SparkConf | |
from collections import OrderedDict | |
# variables | |
app = 'exampleRDDApi' | |
# Spark context configurations | |
conf = (SparkConf().setAppName(app) | |
.setMaster("local[8]") | |
.set('spark.driver.maxResultSize', '8g') | |
.set('spark.logConf', 'true')) | |
sparkContext = SparkContext(conf=conf) | |
# Load CSV into a RDD | |
rdd = sparkContext.textFile('censo_escolar_ensino_medio_2017.csv') | |
# Count all records of df | |
rdd.count() | |
# Map all rows of RDD | |
rows = rdd.map(lambda row: row.split(';')) | |
# Groups all students by their school code | |
cached = rows.groupBy(lambda code: code[3]).cache() | |
# Preparing Dict | |
schools = {} | |
for school in cached.collect(): | |
for student in school[1]: | |
schools.setdefault(school[0], dict()) | |
schools[school[0]].setdefault(student[1], dict()) | |
schools[school[0]][student[1]].setdefault(student[2], list()) | |
schools[school[0]][student[1]][student[2]].append(student[0]) | |
schools = OrderedDict(sorted(schools.items(), key=lambda t: t[0])) | |
# Export data to csv file | |
with open('censo_escolar_ensino_medio_2017_processed.csv', 'wb') as output: | |
fieldnames = [u'school_id', u'year', u'step', u'student_ids'] | |
wr = csv.DictWriter(output, fieldnames=fieldnames, delimiter=';') | |
wr.writeheader() | |
print 'writing csv file' | |
for school_id, years in schools.items(): | |
for year, steps in years.items(): | |
for step, student_ids in steps.items(): | |
wr.writerow({'school_id': school_id, 'year': year, 'step': step, 'student_ids': ",".join(student_ids)}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment