Last active
March 17, 2020 19:17
-
-
Save arapat/db820b35ff2ba758739dd932b4d05d67 to your computer and use it in GitHub Desktop.
Spark trick: use accumulators to collect logs from the worker nodes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext | |
from pyspark.accumulators import AccumulatorParam | |
# Spark only implements Accumulator parameter for numeric types. | |
# This class extends Accumulator support to the string type. | |
class StringAccumulatorParam(AccumulatorParam): | |
def zero(self, value): | |
return value | |
def addInPlace(self, val1, val2): | |
return val1 + val2 | |
# a toy map function | |
def f(k): | |
accumlog.add("Added 1 to %d.\n" % k) | |
return k + 1 | |
sc = SparkContext(master=master_url) | |
accumlog = sc.accumulator("", StringAccumulatorParam()) | |
print "Initial value of the accumulator: '%s'" % accumlog.value | |
rdd = sc.parallelize(range(10)) | |
print "Initial content of the RDD:", rdd.collect() | |
print "Now we apply the `f` function to the RDD:", rdd.map(f).collect() | |
print "Log is updated:" | |
print accumlog.value | |
''' | |
Terminal output: | |
Initial value of the accumulator: '' | |
Initial content of the RDD: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] | |
Now we apply the `f` function to the RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
Log is updated: | |
Added 1 to 0. | |
Added 1 to 1. | |
Added 1 to 2. | |
Added 1 to 3. | |
Added 1 to 4. | |
Added 1 to 5. | |
Added 1 to 6. | |
Added 1 to 7. | |
Added 1 to 8. | |
Added 1 to 9. | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment