pyspark - Spark Accumulator confusion -


i'm writing spark job takes in data multiple sources, filters bad input rows, , outputs modified version of input. job has 2 additional requirements:

  • i must keep track of number of bad inputs rows per source notify upstream providers.
  • i must support output limit per source.

the job seemed straightforward , approached problem using accumulators keep track of number of filtered rows per source. however, when implemented final .limit(n), accumulator behavior changed. here's striped down sample code triggers behavior on single source:

from pyspark.sql import row, sparksession pyspark.sql.types import * random import randint  def filter_and_transform_parts(rows, filter_int, accum):     r in rows:         if r[0] == filter_int:             accum.add(1)             continue          yield r[0], r[1] + 1, r[2] + 1  def main():     spark= sparksession \            .builder \            .appname("test") \            .getorcreate()      sc = spark.sparkcontext     accum = sc.accumulator(0)      # 20 inputs w/ tuple having 4 first element                                                                                                                                                                                             inputs = [(4, randint(1, 10), randint(1, 10)) if x % 5 == 0 else (randint(6, 10), randint(6, 10), randint(6, 10)) x in xrange(100)]      rdd = sc.parallelize(inputs)     # filter out tuples 4 first element                                                                                                                                                                                               rdd = rdd.mappartitions(lambda r: filter_and_transform_parts(r, 4, accum))       # if not limit, accumulator value 20                                                                                                                                                                                                    # if limit , limit_count <= 63, accumulator value 0                                                                                                                                                                                   # if limit , limit_count >= 64, accumulator value 20                                                                                                                                                                                    limit = true     limit_count = 63      if limit:         rdd = rdd.map(lambda r: row(r[0], r[1], r[2]))         df_schema = structtype([structfield("val1", integertype(), false),                                 structfield("val2", integertype(), false),                                 structfield("val3", integertype(), false)])         df = spark.createdataframe(rdd, schema=df_schema)         df = df.limit(limit_count)         df.write.mode("overwrite").csv('foo/')     else:         rdd.saveastextfile('foo/')      print "accum value: {}".format(accum.value)  if __name__ == "__main__":     main() 

the problem accumulator reports number of filtered rows , doesn't, depending on limit specified , number of inputs source. however, in situations filtered rows don't make output meaning filter occurred , accumulator should have value.

if can shed light on that'd helpful, thanks!

update:

  • adding rdd.persist() call after mappartitions made accumulator behavior consistent.

actually, doesnt't matter limit_count's value is.

the reason why sometime accum value 0 because performe accumulator in transformations(e.g.: rdd.map,rdd.mappartitions).

spark guaranty accumulator works inside actions(e.g.: rdd.foreach)

lets make little bit of change on code:

from pyspark.sql import * random import randint   def filter_and_transform_parts(rows, filter_int, accum):     r in rows:         if r[0] == filter_int:             accum.add(1)   def main():     spark = sparksession.builder.appname("test").getorcreate()      sc = spark.sparkcontext     print(sc.applicationid)     accum = sc.accumulator(0)      inputs = [(4, x * 10, x * 100) if x % 5 == 0 else (randint(6, 10), x * 10, x * 100) x in xrange(100)]     rdd = sc.parallelize(inputs)     rdd.foreachpartition(lambda r: filter_and_transform_parts(r, 4, accum))      limit = true     limit_count = 10 or 'whatever'      if limit:         rdd = rdd.map(lambda r: row(val1=r[0], val2=r[1], val3=r[2]))         df = spark.createdataframe(rdd)         df = df.limit(limit_count)         df.write.mode("overwrite").csv('file:///tmp/output')     else:         rdd.saveastextfile('file:///tmp/output')      print "accum value: {}".format(accum.value)   if __name__ == "__main__":     main() 

accum value equle 20 time

for more information:

http://spark.apache.org/docs/2.0.2/programming-guide.html#accumulators


Comments

Popular posts from this blog

javascript - Clear button on addentry page doesn't work -

c# - Selenium Authentication Popup preventing driver close or quit -

tensorflow when input_data MNIST_data , zlib.error: Error -3 while decompressing: invalid block type -