【问题标题】:Spark Streaming and ElasticSearch - Could not write all entriesSpark Streaming 和 ElasticSearch - 无法写入所有条目
【发布时间】:2015-07-02 20:18:08
【问题描述】:

我目前正在编写一个由 Producer 和 Consumer 组成的 Scala 应用程序。生产者从外部源获取一些数据,并将它们写入 Kafka。 Consumer 从 Kafka 读取数据并写入 Elasticsearch。

消费者基于 Spark Streaming,每 5 秒从 Kafka 获取新消息并将其写入 ElasticSearch。问题是我无法写入 ES,因为我遇到了很多错误,如下所示:

错误] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]: TaskCompletionListener 中的错误 org.elasticsearch.hadoop.EsHadoopException:无法全部写入 条目 [3/26560](也许 ES 过载了?)。救助...在 org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] 在 org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:57) ~[spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) [spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [na:na] 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] 在 org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.scheduler.Task.run(Task.scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2.1.jar:1.2.1] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65] 在 java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

考虑到生产者每 15 秒写 6 条消息,所以我真的不明白这种“过载”是如何发生的(我什至清理了主题并刷新了所有旧消息,我认为这与偏移问题有关)。 Spark Streaming 每 5 秒执行一次的任务可以总结为以下代码:

  val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
  val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))



  //TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
  log.info(s"*** EXECUTING SPARK STREAMING TASK  + ${java.lang.System.currentTimeMillis()}***")


  convertedResult.foreachRDD(rdd => {
      rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))

  })

如果我尝试打印消息而不是发送到 ES,一切都很好,我实际上只看到 6 条消息。为什么我不能写ES?

为了完整起见,我使用这个库来写入 ES:elasticsearch-spark_2.10 和最新的 beta 版本。

【问题讨论】:

  • 我在尝试从 spark 数据帧(不是流式传输)将大表写回 ES 时遇到同样的错误。我的默认设置是使用 100 个执行器,所以基本上有 100 个并发连接到我们的小型 ES 集群。对我有用的解决方案是将数据帧重新分区为少量分区(在我的情况下为 10 个),以限制 spark 可以建立的最大并发连接数。

标签: elasticsearch apache-kafka spark-streaming


【解决方案1】:

经过多次重试后,我发现了一种写入 ElasticSearch 且不会出现任何错误的方法。基本上将参数"es.batch.size.entries" -> "1" 传递给saveToES 方法就解决了这个问题。我不明白为什么使用默认或任何其他批量大小会导致上述错误,因为如果我尝试编写比允许的最大批量大小更多的东西,而不是更少,我会收到错误消息。

此外,我注意到实际上我正在写信给 ES,但不是我的所有消息,我每批丢失 1 到 3 条消息。

【讨论】:

    【解决方案2】:

    当我在 Spark 上将数据帧推送到 ES 时,我收到了相同的错误消息。即使使用"es.batch.size.entries" -> "1" 配置,我也遇到了同样的错误。 一旦我在 ES 中增加了线程池,我就可以解决这个问题。

    例如,

    大容量池

    threadpool.bulk.type: fixed
    threadpool.bulk.size: 600
    threadpool.bulk.queue_size: 30000
    

    【讨论】:

      【解决方案3】:

      就像这里已经提到的,这是一个文档写入冲突

      您的 convertedResult 数据流包含具有相同 ID 的多条记录。当作为同一批次的一部分写入弹性时,会产生上述错误。

      可能的解决方案:

      1. 为每条记录生成唯一的 ID。根据您的用例,可以通过几种不同的方式完成。例如,一种常见的解决方案是通过组合 idlastModifiedDate 字段来创建一个新字段,并在写入弹性文件时将该字段用作 id。
      2. 根据 id 对记录执行重复数据删除 - 仅选择一条具有特定 id 的记录并丢弃其他重复项。根据您的用例,这可能是最新的记录(基于时间戳字段)、最完整的记录(大部分字段包含数据)等。

      #1 解决方案将存储您在流中收到的所有记录

      #2 解决方案将根据您的重复数据删除逻辑仅存储特定 ID 的唯一记录。此结果与设置"es.batch.size.entries" -> "1" 相同,但您不会通过一次写入一条记录来限制性能。

      【讨论】:

        【解决方案4】:

        其中一种可能性是集群/分片状态为 RED。请解决这个问题,这可能是由于未分配的副本造成的。一旦状态变为绿色,API 调用就成功了。

        【讨论】:

          【解决方案5】:

          这是一个文档写冲突

          例如:
          多个文档指定 相同的 _id 供 Elasticsearch 使用。
          这些文档位于不同的分区
          Spark同时将多个分区写入 ES。

          结果是 Elasticsearch 一次接收单个文档的多个更新 - 来自多个来源/通过多个节点/包含不同的数据

          “我每批丢失 1 到 3 条消息。”

          • 批量大小 > 1 时失败的数量波动
          • 如果批量写入大小为“1”则成功

          【讨论】:

            【解决方案6】:

            只是添加此错误的另一个潜在原因,希望它可以帮助某人。 如果您的 Elasticsearch 索引有子文档,那么:

            1. 如果您使用的是自定义路由字段(不是 _id),那么根据 文档 不保证文档的唯一性。 从 spark 更新时,这可能会导致问题。
            2. 如果您使用标准 _id,将保留唯一性,但是您需要确保在从 Spark 写入 Elasticsearch 时提供以下选项:
              • es.mapping.join
              • es.mapping.routing

            【讨论】:

              猜你喜欢
              • 2018-12-19
              • 2018-08-13
              • 2016-10-31
              • 2020-01-27
              • 1970-01-01
              • 2019-07-12
              • 1970-01-01
              • 1970-01-01
              • 2018-06-04
              相关资源
              最近更新 更多