【问题标题】:Index large amount of data into elasticsearch将大量数据索引到elasticsearch
【发布时间】:2016-12-15 10:43:28
【问题描述】:

我在 HBase 中拥有超过 60 亿条社交媒体数据(包括内容/时间/作者和其他可能的字段),48 个服务器中的 4100 个区域,我现在需要将这些数据刷新到 Elasticsearch 中。

我很清楚 ES 的批量 API,在 Java 中使用 MapReduce 的批量仍然需要很多天(至少一周左右)。我可以改用 spark,但我认为它不会有太大帮助。

我想知道是否有任何其他技巧可以将这些大数据写入 ElasticSearch ?比如手动写入es索引文件并使用某种recover加载本地文件系统中的文件?

感谢任何可能的建议,谢谢。

==============

关于我的集群环境的一些细节:

spark 1.3.1 独立版(我可以在 yarn 上更改它以使用 Spark 1.6.2 或 1.6.3)

Hadoop 2.7.1 (HDP 2.4.2.258)

弹性搜索 2.3.3

【问题讨论】:

  • :我在 Spark + Hbase +Solr 以及 Solr + Hbase + mapreduce 索引方面也有经验。我不知道除了 spark 之外的任何其他技术,mapreduce 将大大提高性能。根据我的经验,你可以接受 spark 是最好的选择。

标签: elasticsearch apache-spark indexing mapreduce hbase


【解决方案1】:

AFAIK Spark 是索引以下 2 个选项的最佳选择。 以下是我提供的方法:

划分(输入扫描标准)并征服 60 亿社交媒体数据:

我建议创建多个具有不同搜索条件的 Spark/Mapreduce 作业(根据类别或其他将 60 亿社交媒体数据划分为 6 个部分)并并行触发它们。 例如基于数据捕获时间范围(scan.setTimeRange(t1, t2)) 或者一些模糊行逻辑(FuzzyRowFilter),肯定会加快速度。

流式处理方式:

您还可以考虑在通过 spark 或 mapreduce 插入数据时同时为它们创建索引。

例如在 SOLR 的情况下: clouder 具有 NRT hbase lily 索引器...即当基于 WAL(预写日志)条目同时填充 hbase 表时,它将创建 solr 索引。检查是否有类似 Elastic 搜索的内容。

即使 ES 也不存在,也不必费心,同时使用您可以自己创建的 Spark/Mapreduce 程序自行摄取数据。

选项 1:

我建议如果你对 spark 没问题,这是一个很好的解决方案 Spark 支持从 hadoop 2.1 原生集成 ES。 见

elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of an RDD (Resilient Distributed Dataset) (or Pair RDD to be precise) that can read data from Elasticsearch. The RDD is offered in two flavors: one for Scala (which returns the data as Tuple2 with Scala collections) and one for Java (which returns the data as Tuple2 containing java.util collections).

选项 2:你知道比 spark 慢一点

Writing data to Elasticsearch With elasticsearch-hadoop, Map/Reduce jobs can write data to Elasticsearch making it searchable through indexes. elasticsearch-hadoop supports both (so-called) old and new Hadoop APIs.

【讨论】:

  • 谢谢! es-hadoop 可能是一种选择。我必须研究 es-hadoop 的源代码的要点是它如何计算输出格式拆分。我相信如果我能确保 mr-split/spark-partition 中的每个数据都映射到 ES 中的正确分片中,那么请求转发和批量线程池占用的成本可以降低。我担心 es-hadoop 仍在使用批量 api,如果是这样,性能实际上不会得到改善。
  • 是的,好吧!您的回答提供了几种可能的优化方法,我认为值得接受!非常感谢。
【解决方案2】:

我自己发现了一个提高批量索引性能的实用技巧。

我可以在我的客户端计算哈希路由,并确保每个批量请求都包含具有相同路由的所有索引请求。根据路由结果和带ip的分片信息,我直接将批量请求发送到对应的分片节点。这个技巧可以避免批量重路由成本,减少可能导致EsRejectedException的批量请求线程池占用。

例如,我在不同的机器上有 48 个节点。假设我向任意一个节点发送了一个包含 3000 个索引请求的批量请求,这些索引请求将根据路由重新路由到其他节点(通常是所有节点)。并且客户端线程必须等待整个过程完成,包括处理本地批量和等待其他节点的批量响应。但是,如果没有重新路由阶段,网络成本就没有了(除了转发到副本节点),客户端只需要等待更少的时间。同时,假设我只有1个replica,那么bulk线程的总占用量只有2个。 (客户端->主分片和主分片->副本分片)

路由哈希:

shard_num = murmur3_hash (_routing) % num_primary_shards

尝试查看:org.elasticsearch.cluster.routing.Murmur3HashFunction

客户端可以通过请求cat api获取分片和索引别名。

分片信息网址:cat shards

别名映射网址:cat aliases

一些注意事项:

  1. ES 可能会在不同版本中更改默认哈希函数,这意味着客户端代码可能不兼容版本。
  2. 这个技巧是基于哈希结果基本平衡的假设。
  3. 客户端应考虑容错,例如与相应分片节点的连接超时。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-25
    • 2016-03-31
    • 1970-01-01
    • 2019-04-29
    • 2015-04-27
    相关资源
    最近更新 更多