serialization - Spark Task not Serializable Hadoop-MongoDB-Connector Enron -
i trying run enronmail example of hadoop-mongodb connector spark. therefore using java code example github: https://github.com/mongodb/mongo-hadoop/blob/master/examples/enron/spark/src/main/java/com/mongodb/spark/examples/enron/enron.java adjusted server name , added username , password according needs.
the error message got following:
exception in thread "main" org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:304) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:294) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2066) @ org.apache.spark.rdd.rdd$$anonfun$flatmap$1.apply(rdd.scala:333) @ org.apache.spark.rdd.rdd$$anonfun$flatmap$1.apply(rdd.scala:332) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:316) @ org.apache.spark.rdd.rdd.flatmap(rdd.scala:332) @ org.apache.spark.api.java.javarddlike$class.flatmap(javarddlike.scala:130) @ org.apache.spark.api.java.abstractjavarddlike.flatmap(javarddlike.scala:46) @ enron.run(enron.java:43) @ enron.main(enron.java:104) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.io.notserializableexception: enron serialization stack: - object not serializable (class: enron, value: enron@62b09715) - field (class: enron$1, name: this$0, type: class enron) - object (class enron$1, enron$1@ee8e7ff) - field (class: org.apache.spark.api.java.javarddlike$$anonfun$fn$1$1, name: f$3, type: interface org.apache.spark.api.java.function.flatmapfunction) - object (class org.apache.spark.api.java.javarddlike$$anonfun$fn$1$1, <function1>) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:101) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:301) ... 22 more
i created new class flatmapfunction , extended enron class class. couldn't solve problem. ideas how solve issue?
class flatmapfunctionser implements serializable{ static flatmapfunction<tuple2<object, bsonobject>, string> flatfunc = new flatmapfunction<tuple2<object, bsonobject>, string>() { @override public iterable<string> call(final tuple2<object, bsonobject> t) throws exception { bsonobject header = (bsonobject) t._2().get("headers"); string = (string) header.get("to"); string = (string) header.get("from"); // each tuple in set individual from|to pair //javapairrdd<string, integer> tuples = new javapairrdd<string, integer>(); list<string> tuples = new arraylist<string>(); if (to != null && !to.isempty()) { (string recipient : to.split(",")) { string s = recipient.trim(); if (s.length() > 0) { tuples.add(from + "|" + s); } } } return tuples; } }; }
the problem got solved including mongo-hadoop-spark-2.0.2.jar call. , using following pom:
<dependencies> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-sql_2.11</artifactid> <version>1.5.1</version> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-core_2.11</artifactid> <version>1.5.1</version> </dependency> <dependency> <groupid>log4j</groupid> <artifactid>log4j</artifactid> <version>1.2.14</version> </dependency> <!-- https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core --> <dependency> <groupid>org.mongodb.mongo-hadoop</groupid> <artifactid>mongo-hadoop-core</artifactid> <version>1.4.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.mongodb/bson --> <dependency> <groupid>org.mongodb</groupid> <artifactid>bson</artifactid> <version>3.4.2</version> </dependency> </dependencies> </project>
Comments
Post a Comment