pyspark - Spark Accumulator confusion -
i'm writing spark job takes in data multiple sources, filters bad input rows, , outputs modified version of input. job has 2 additional requirements:
- i must keep track of number of bad inputs rows per source notify upstream providers.
- i must support output limit per source.
the job seemed straightforward , approached problem using accumulators keep track of number of filtered rows per source. however, when implemented final .limit(n)
, accumulator behavior changed. here's striped down sample code triggers behavior on single source:
from pyspark.sql import row, sparksession pyspark.sql.types import * random import randint def filter_and_transform_parts(rows, filter_int, accum): r in rows: if r[0] == filter_int: accum.add(1) continue yield r[0], r[1] + 1, r[2] + 1 def main(): spark= sparksession \ .builder \ .appname("test") \ .getorcreate() sc = spark.sparkcontext accum = sc.accumulator(0) # 20 inputs w/ tuple having 4 first element inputs = [(4, randint(1, 10), randint(1, 10)) if x % 5 == 0 else (randint(6, 10), randint(6, 10), randint(6, 10)) x in xrange(100)] rdd = sc.parallelize(inputs) # filter out tuples 4 first element rdd = rdd.mappartitions(lambda r: filter_and_transform_parts(r, 4, accum)) # if not limit, accumulator value 20 # if limit , limit_count <= 63, accumulator value 0 # if limit , limit_count >= 64, accumulator value 20 limit = true limit_count = 63 if limit: rdd = rdd.map(lambda r: row(r[0], r[1], r[2])) df_schema = structtype([structfield("val1", integertype(), false), structfield("val2", integertype(), false), structfield("val3", integertype(), false)]) df = spark.createdataframe(rdd, schema=df_schema) df = df.limit(limit_count) df.write.mode("overwrite").csv('foo/') else: rdd.saveastextfile('foo/') print "accum value: {}".format(accum.value) if __name__ == "__main__": main()
the problem accumulator reports number of filtered rows , doesn't, depending on limit specified , number of inputs source. however, in situations filtered rows don't make output meaning filter occurred , accumulator should have value.
if can shed light on that'd helpful, thanks!
update:
- adding
rdd.persist()
call aftermappartitions
made accumulator behavior consistent.
actually, doesnt't matter limit_count
's value is.
the reason why sometime accum value
0 because performe accumulator in transformations(e.g.: rdd.map,rdd.mappartitions).
spark guaranty accumulator works inside actions(e.g.: rdd.foreach)
lets make little bit of change on code:
from pyspark.sql import * random import randint def filter_and_transform_parts(rows, filter_int, accum): r in rows: if r[0] == filter_int: accum.add(1) def main(): spark = sparksession.builder.appname("test").getorcreate() sc = spark.sparkcontext print(sc.applicationid) accum = sc.accumulator(0) inputs = [(4, x * 10, x * 100) if x % 5 == 0 else (randint(6, 10), x * 10, x * 100) x in xrange(100)] rdd = sc.parallelize(inputs) rdd.foreachpartition(lambda r: filter_and_transform_parts(r, 4, accum)) limit = true limit_count = 10 or 'whatever' if limit: rdd = rdd.map(lambda r: row(val1=r[0], val2=r[1], val3=r[2])) df = spark.createdataframe(rdd) df = df.limit(limit_count) df.write.mode("overwrite").csv('file:///tmp/output') else: rdd.saveastextfile('file:///tmp/output') print "accum value: {}".format(accum.value) if __name__ == "__main__": main()
accum value equle 20 time
for more information:
http://spark.apache.org/docs/2.0.2/programming-guide.html#accumulators
Comments
Post a Comment