【问题标题】:Spark + Elastic search write performance issueSpark + Elastic 搜索写入性能问题
【发布时间】:2018-03-30 12:17:43
【问题描述】:

使用 spark java 对 elasticsearch 的写入次数很少。

这里是配置

ES集群使用13.xlarge机器

 4 instances each have 4 processors.
 Set refresh interval to -1 and replications to '0' and other basic 
 configurations required for better writing.

火花:

2 节点 EMR 集群与

 2 Core instances
  - 8 vCPU, 16 GiB memory, EBS only storage
  - EBS Storage:1000 GiB

1 Master node
  - 1 vCPU, 3.8 GiB memory, 410 SSD GB storage

ES 索引在映射中定义了 16 个分片。

运行作业时有以下配置,

executor-memory - 8g
spark.executor.instances=2
spark.executor.cores=4

并使用

es.batch.size.bytes - 6MB
es.batch.size.entries - 10000
es.batch.write.refresh - false

使用此配置,我尝试加载 100 万个文档(每个文档的大小为 1300 字节),因此每个 ES 节点加载 500 个记录/文档。

在火花日志中看到每个任务

 -1116 bytes result sent to driver

火花代码

    JavaRDD<String> javaRDD = jsc.textFile("<S3 Path>");
    JavaEsSpark.saveJsonToEs(javaRDD,"<Index name>");

此外,当我查看 ES 集群中的网络内图时,它非常低,我看到 EMR 没有通过网络发送大量数据。有没有办法告诉 Spark 发送正确数量的数据以加快写入速度?

还有其他我想调整的配置吗? 因为我看到每个 es 实例每秒 500 个文档较低。有人可以指导一下这个设置缺少什么来提高我的 es 写入性能

提前致谢

【问题讨论】:

  • 在你的s3目录下,你是读取单个文件,还是读取多个文件?
  • 使用很多文件

标签: apache-spark elasticsearch elasticsearch-hadoop elasticsearch-spark


【解决方案1】:

您可能在这里遇到问题。 spark.executor.instances=2

您仅限于两个执行程序,根据您的集群配置,您可以有 4 个。我会将其更改为 4 或更大。我也可以尝试 executor-memory = 1500M,cores=1,instances=16。我喜欢在我的记忆中留下一点开销,这就是我从 2G 降到 1.5G 的原因(但你不能做 1.5G,所以我们必须做 1500M)。如果您通过执行程序进行连接,这将提高性能。

需要一些代码来进一步调试。我想知道您是否仅在您的驱动程序中连接到弹性搜索,而不是在您的工作节点中。这意味着您只能获得一个连接,而不是每个执行者的一个连接。

【讨论】:

  • 非常感谢,Dan,当您说将执行程序增加到 4 个时,您的意思是将 EMR 集群增加到 4 个实例而不是 2 个?我连接到 ES 的方式是通过下面的代码。 SparkConf conf = new SparkConf().setAppName("SparkES Application");
  • SparkConf conf = new SparkConf().setAppName("SparkES Application"); conf.set("es.nodes",""); conf.set("es.batch.size.bytes","6mb"); conf.set("es.batch.size.entries","10000"); conf.set("es.batch.concurrent.request","4"); conf.set("es.batch.write.refresh","false"); conf.set("spark.kryoserializer.buffer","24"); JavaSparkContext jsc = 新的 JavaSparkContext(conf); JavaRDD javaRDD = jsc.textFile("S3 PATH"); JavaEsSpark.saveJsonToEs(javaRDD,"索引名称");
  • 上面的最后两行在一个方法中,从 main() 调用,我发送一个参数在方法 loadSNindex(jsc) 中使用;
  • 另外,当我使用 8 个执行程序和 2 个内核运行时,我验证了其中一个 es 节点中的连接数,我看到 4 个已建立的端口 9200 连接。
  • @camelBeginner “当您说将执行程序增加到 4 个时,您的意思是将 EMR 集群增加到 4 个实例而不是 2 个?”,不,我的意思是将 'spark.executor.instances' 设置为 4 2. 与您使用的虚拟机数量无关。
猜你喜欢
  • 2018-08-23
  • 1970-01-01
  • 2016-07-12
  • 2011-07-22
  • 2022-11-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多