pyspark - Spark Accumulator confusion -
i'm writing spark job takes in data multiple sources, filters bad input rows, , outputs modified version of input. job has 2 additional requirements:
- i must keep track of number of bad inputs rows per source notify upstream providers.
- i must support output limit per source.
the job seemed straightforward , approached problem using accumulators keep track of number of filtered rows per source. however, when implemented final .limit(n), accumulator behavior changed. here's striped down sample code triggers behavior on single source:
from pyspark.sql import row, sparksession pyspark.sql.types import * random import randint def filter_and_transform_parts(rows, filter_int, accum): r in rows: if r[0] == filter_int: accum.add(1) continue yield r[0], r[1] + 1, r[2] + 1 def main(): spark= sparksession \ .builder \ .appname("test") \ .getorcreate() sc = spark.sparkcontext accum = sc.accumulator(0) # 20 inputs w/ tuple having 4 first element inputs = [(4, randint(1, 10), randint(1, 10)) if x % 5 == 0 else (randint(6, 10), randint(6, 10), randint(6, 10)) x in xrange(100)] rdd = sc.parallelize(inputs) # filter out tuples 4 first element rdd = rdd.mappartitions(lambda r: filter_and_transform_parts(r, 4, accum)) # if not limit, accumulator value 20 # if limit , limit_count <= 63, accumulator value 0 # if limit , limit_count >= 64, accumulator value 20 limit = true limit_count = 63 if limit: rdd = rdd.map(lambda r: row(r[0], r[1], r[2])) df_schema = structtype([structfield("val1", integertype(), false), structfield("val2", integertype(), false), structfield("val3", integertype(), false)]) df = spark.createdataframe(rdd, schema=df_schema) df = df.limit(limit_count) df.write.mode("overwrite").csv('foo/') else: rdd.saveastextfile('foo/') print "accum value: {}".format(accum.value) if __name__ == "__main__": main() the problem accumulator reports number of filtered rows , doesn't, depending on limit specified , number of inputs source. however, in situations filtered rows don't make output meaning filter occurred , accumulator should have value.
if can shed light on that'd helpful, thanks!
update:
- adding
rdd.persist()call aftermappartitionsmade accumulator behavior consistent.
actually, doesnt't matter limit_count's value is.
the reason why sometime accum value 0 because performe accumulator in transformations(e.g.: rdd.map,rdd.mappartitions).
spark guaranty accumulator works inside actions(e.g.: rdd.foreach)
lets make little bit of change on code:
from pyspark.sql import * random import randint def filter_and_transform_parts(rows, filter_int, accum): r in rows: if r[0] == filter_int: accum.add(1) def main(): spark = sparksession.builder.appname("test").getorcreate() sc = spark.sparkcontext print(sc.applicationid) accum = sc.accumulator(0) inputs = [(4, x * 10, x * 100) if x % 5 == 0 else (randint(6, 10), x * 10, x * 100) x in xrange(100)] rdd = sc.parallelize(inputs) rdd.foreachpartition(lambda r: filter_and_transform_parts(r, 4, accum)) limit = true limit_count = 10 or 'whatever' if limit: rdd = rdd.map(lambda r: row(val1=r[0], val2=r[1], val3=r[2])) df = spark.createdataframe(rdd) df = df.limit(limit_count) df.write.mode("overwrite").csv('file:///tmp/output') else: rdd.saveastextfile('file:///tmp/output') print "accum value: {}".format(accum.value) if __name__ == "__main__": main() accum value equle 20 time
for more information:
http://spark.apache.org/docs/2.0.2/programming-guide.html#accumulators
Comments
Post a Comment