【发布时间】:2017-08-28 16:47:17
【问题描述】:
我正在本地机器上测试 ElasticSearch 和 Spark 的集成,使用在 elasticsearch 中加载的一些测试数据。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
代码运行良好并成功返回正确的结果 esRDD.first()
但是,esRDD.collect() 会产生异常:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html 所以我相应地添加了这一行
conf.set("spark.serializer", classOf[KryoSerializer].getName)
我应该做其他事情来让它工作吗? 谢谢
更新: 序列化设置问题已解决。通过使用
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
而不是
conf.set("spark.serializer", classOf[KryoSerializer].getName)
现在还有一个 该数据集中有 1000 条不同的记录
esRDD.count()
返回 1000 没问题,但是
esRDD.distinct().count()
返回 5 !如果我打印记录
esRDD.foreach(println)
它正确打印出 1000 条记录。但是如果我使用 collect 或 take
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
它将打印 DUPLICATED 记录,并且确实只显示了 5 条 UNIQUE 记录,这似乎是整个数据集的一个随机子集 - 它不是前 5 条记录。 如果我保存 RDD 并读回它
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
esRDD2 的行为符合预期。我想知道是否存在错误,或者我对收集/获取的行为不了解。还是因为我在本地运行所有内容。 默认情况下,Spark RDD 似乎使用 5 个分区,如“spark-output”文件的 part-xxxx 文件的数量所示。这可能就是 esRDD.collect() 和 esRDD.distinct() 返回 5 个唯一记录而不是其他一些随机数的原因。但这仍然不对。
【问题讨论】:
-
你能找到第二个问题的根源吗?我目前面临一个类似的问题,当 es.resource 指向单个索引(有 5 个分片)时 count 返回正确的值,但查询 two 时是 X4 倍(而不是 X2) i> 完全相同的索引(总共 10 个分片)。 distinct 解决了这个问题并产生了正确的结果,但我不明白为什么 count 不......
标签: serialization elasticsearch apache-spark elasticsearch-hadoop