【问题标题】:Querying Elasticsearch large index from spark从 spark 查询 Elasticsearch 大索引
【发布时间】:2017-11-23 11:50:37
【问题描述】:

我有一个包含近 3.2 亿个文档的 Elasticsearch 索引,其大小为 68 GB,分为 5 个分片。

我想要的是从 Spark 中读取整个索引以将其转换为 parquet 格式。但是,数据太大而无法放入内存,因此出现以下异常:

ERROR NetworkClient: Node [127.0.0.1:9200] failed (Read timed out); no other nodes left - aborting...
ERROR Utils: Aborting task
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.1:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:466)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:450)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:391)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:365)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

通过这种方式,我尝试将 scroll.limit 属性设置为 1000 以读取 1000 个文档块中的数据,但抛出了相同的异常。查看官方文档,我遇到了“切片滚动”,其中必须管理 scroll_id 才能处理下一批。如果我错了,请纠正我,但理论上,Spark 必须逐批循环数据,直到没有更多数据为止。但是,我找不到如何使用 Spark 实现这一点。

我通过手动过滤(下推)数据来解决此问题,从而减少了向 Elasticsearch 请求的数据量。我使用时间戳来限制查询的响应。我不得不多次查询 Elasticsearch 才能读取整个索引。基本上,我手动进行了切片滚动。如您所见,这不是解决问题的最佳方法。那么,您对我如何解决它以自动方式读取整个数据有什么建议吗?

请注意,Elasticsearch 和 Spark 都在我的本地计算机(16 GB RAM 和 4 个核心)上运行。这是我的代码和依赖项:

带有滚动限制的代码(失败)

val sparkConf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("ElasticSearch to Parquet")
    .set("es.nodes", "localhost")
    .set("es.port", "9200")
    .set("es.index.auto.create", "false")
    .set("es.nodes.wan.only", "false")

val sparkSession = SparkSession
    .builder
    .config(sparkConf)
    .getOrCreate()

val df = sparkSession.sqlContext.read
    .format("org.elasticsearch.spark.sql")
    .option("scroll.limit", 1000)
    .load("my-index/index")

df.write.format("parquet").mode("append").save("data/data.parquet")

具有下推过滤的代码(通过根据需要多次重复查询并更改开始和结束时间戳来工作)

val sparkConf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("ElasticSearch to Parquet")
    .set("es.nodes", "localhost")
    .set("es.port", "9200")
    .set("es.index.auto.create", "false")
    .set("es.nodes.wan.only", "false")

val sparkSession = SparkSession
    .builder
    .config(sparkConf)
    .getOrCreate()

val df = sparkSession.sqlContext.read
    .format("org.elasticsearch.spark.sql")
    .load("my-index/index")

val filter = df.filter(df("timestamp").gt("dateStart").and(df("timestamp").lt("dateEnd")))

filter.write.format("parquet").mode("append").save("data/data.parquet")

Pom.xml

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.0.0</version>
        </dependency>
</dependencies>

【问题讨论】:

  • 据我了解,问题基本上不在于尺寸,而是您的 spark 无法连接到 ES!你是如何运行 spark 和 ES 的,ES 是否可以从所有节点访问?
  • Spark 成功连接 ES。事实上,我放的第二个例子很好用。但是,当我使用滚动限制时,起初它可以工作,但一段时间后抛出异常。我已经在本地机器(Windows 10)上安装了 ElasticSearch。我在本地模式下从 IntelliJ 运行 Spark 程序。
  • 从异常看来,问题在于 ES 变得无响应,而不是火花内存的问题。是scroll.limit 的正确属性,在doc 中好像叫scroll_size。附带说明一下,过滤器下推解决方案自然更适合弹性搜索,因为它是为索引查找而不是完全扫描而设计的,而且它也很容易扩展 - 只要您有足够的容量来处理 spark 和弹性搜索。
  • 你是对的。我已经在不同的机器上运行了 ES 和 Spark,现在滚动似乎可以工作,即使它很慢,但这是另一个问题。显然,我的电脑功能不够强大,无法同时处理这两种情况。我也更换了滚动。通过滚动限制。大小并将其价值增加到10,000。作为一个额外的细节,我注意到 spark 不是问题,因为它只使用几 KB 的内存,所以 ES 给出了缓慢的性能。

标签: scala apache-spark elasticsearch apache-spark-sql


【解决方案1】:

似乎是网络问题Node [127.0.0.1:9200] failed (Read timed out); no other nodes left - aborting...

我解释了 Spark(我的意思是 Spark + elastic4Hadoop lib)如何与 elasticsearch 一起工作:

  • elasticsearch 将数据拆分为 shards,可通过数据节点上的 HTTP 访问(如果启用了 HTTP)
  • Spark 将数据拆分为partitions of RDD

由于您将es.nodes.wan.only 设置为false,Spark 将首先GET /_cat/nodes 获取elasticsearch 集群节点IP,然后映射IP shard。

因此,当 Spark 获取数据时,每个 Spark 工作任务都可以与不同弹性搜索节点上的任何弹性搜索节点分片进行通信。这就是为什么搭配通常是一个非常好的想法(将 Spark worker 和 elasticsearch 放在同一台机器上,big data 的想法,尝试尽可能接近地执行计算)。

此外,这里最重要的调整是:“拥有许多分片,就像 Spark 工作人员拥有 CPU 一样”。 (但自从 es6 带有 sliced scroll 功能以来,这不再是真的了)。

【讨论】:

    猜你喜欢
    • 2018-05-14
    • 2014-05-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-08
    • 2014-08-10
    • 1970-01-01
    相关资源
    最近更新 更多