【发布时间】:2015-07-02 20:18:08
【问题描述】:
我目前正在编写一个由 Producer 和 Consumer 组成的 Scala 应用程序。生产者从外部源获取一些数据,并将它们写入 Kafka。 Consumer 从 Kafka 读取数据并写入 Elasticsearch。
消费者基于 Spark Streaming,每 5 秒从 Kafka 获取新消息并将其写入 ElasticSearch。问题是我无法写入 ES,因为我遇到了很多错误,如下所示:
错误] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]: TaskCompletionListener 中的错误 org.elasticsearch.hadoop.EsHadoopException:无法全部写入 条目 [3/26560](也许 ES 过载了?)。救助...在 org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:57) ~[spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) [spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [na:na] 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] 在 org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.scheduler.Task.run(Task.scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2.1.jar:1.2.1] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65] 在 java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
考虑到生产者每 15 秒写 6 条消息,所以我真的不明白这种“过载”是如何发生的(我什至清理了主题并刷新了所有旧消息,我认为这与偏移问题有关)。 Spark Streaming 每 5 秒执行一次的任务可以总结为以下代码:
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
如果我尝试打印消息而不是发送到 ES,一切都很好,我实际上只看到 6 条消息。为什么我不能写ES?
为了完整起见,我使用这个库来写入 ES:elasticsearch-spark_2.10 和最新的 beta 版本。
【问题讨论】:
-
我在尝试从 spark 数据帧(不是流式传输)将大表写回 ES 时遇到同样的错误。我的默认设置是使用 100 个执行器,所以基本上有 100 个并发连接到我们的小型 ES 集群。对我有用的解决方案是将数据帧重新分区为少量分区(在我的情况下为 10 个),以限制 spark 可以建立的最大并发连接数。
标签: elasticsearch apache-kafka spark-streaming