【发布时间】:2017-11-23 11:50:37
【问题描述】:
我有一个包含近 3.2 亿个文档的 Elasticsearch 索引,其大小为 68 GB,分为 5 个分片。
我想要的是从 Spark 中读取整个索引以将其转换为 parquet 格式。但是,数据太大而无法放入内存,因此出现以下异常:
ERROR NetworkClient: Node [127.0.0.1:9200] failed (Read timed out); no other nodes left - aborting...
ERROR Utils: Aborting task
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.1:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:466)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:450)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:391)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:365)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
通过这种方式,我尝试将 scroll.limit 属性设置为 1000 以读取 1000 个文档块中的数据,但抛出了相同的异常。查看官方文档,我遇到了“切片滚动”,其中必须管理 scroll_id 才能处理下一批。如果我错了,请纠正我,但理论上,Spark 必须逐批循环数据,直到没有更多数据为止。但是,我找不到如何使用 Spark 实现这一点。
我通过手动过滤(下推)数据来解决此问题,从而减少了向 Elasticsearch 请求的数据量。我使用时间戳来限制查询的响应。我不得不多次查询 Elasticsearch 才能读取整个索引。基本上,我手动进行了切片滚动。如您所见,这不是解决问题的最佳方法。那么,您对我如何解决它以自动方式读取整个数据有什么建议吗?
请注意,Elasticsearch 和 Spark 都在我的本地计算机(16 GB RAM 和 4 个核心)上运行。这是我的代码和依赖项:
带有滚动限制的代码(失败)
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("ElasticSearch to Parquet")
.set("es.nodes", "localhost")
.set("es.port", "9200")
.set("es.index.auto.create", "false")
.set("es.nodes.wan.only", "false")
val sparkSession = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
val df = sparkSession.sqlContext.read
.format("org.elasticsearch.spark.sql")
.option("scroll.limit", 1000)
.load("my-index/index")
df.write.format("parquet").mode("append").save("data/data.parquet")
具有下推过滤的代码(通过根据需要多次重复查询并更改开始和结束时间戳来工作)
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("ElasticSearch to Parquet")
.set("es.nodes", "localhost")
.set("es.port", "9200")
.set("es.index.auto.create", "false")
.set("es.nodes.wan.only", "false")
val sparkSession = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
val df = sparkSession.sqlContext.read
.format("org.elasticsearch.spark.sql")
.load("my-index/index")
val filter = df.filter(df("timestamp").gt("dateStart").and(df("timestamp").lt("dateEnd")))
filter.write.format("parquet").mode("append").save("data/data.parquet")
Pom.xml
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.0.0</version>
</dependency>
</dependencies>
【问题讨论】:
-
据我了解,问题基本上不在于尺寸,而是您的 spark 无法连接到 ES!你是如何运行 spark 和 ES 的,ES 是否可以从所有节点访问?
-
Spark 成功连接 ES。事实上,我放的第二个例子很好用。但是,当我使用滚动限制时,起初它可以工作,但一段时间后抛出异常。我已经在本地机器(Windows 10)上安装了 ElasticSearch。我在本地模式下从 IntelliJ 运行 Spark 程序。
-
从异常看来,问题在于 ES 变得无响应,而不是火花内存的问题。是scroll.limit 的正确属性,在doc 中好像叫scroll_size。附带说明一下,过滤器下推解决方案自然更适合弹性搜索,因为它是为索引查找而不是完全扫描而设计的,而且它也很容易扩展 - 只要您有足够的容量来处理 spark 和弹性搜索。
-
你是对的。我已经在不同的机器上运行了 ES 和 Spark,现在滚动似乎可以工作,即使它很慢,但这是另一个问题。显然,我的电脑功能不够强大,无法同时处理这两种情况。我也更换了滚动。通过滚动限制。大小并将其价值增加到10,000。作为一个额外的细节,我注意到 spark 不是问题,因为它只使用几 KB 的内存,所以 ES 给出了缓慢的性能。
标签: scala apache-spark elasticsearch apache-spark-sql