【问题标题】:Getting Elasticsearch Data into HDFS Easily轻松将 Elasticsearch 数据导入 HDFS
【发布时间】:2018-04-16 17:03:45
【问题描述】:

我们已经在内部部署了一个 Elasticsearch 集群近 2 年了,并且希望对其中包含的日志数据以及其他不同的数据源进行一些更高级的分析等。

我们的重点是 Elasticsearch 中的系统日志。每天生成约 100gb 的系统日志数据 - 每天都是它自己的索引。我们也有一些应用程序日志,但是如果我可以为 syslog 解决这个问题,我可以轻松解决其他数据移动问题。

这引出了我的问题。对于我的分析,我们使用带有 Python API 的 Spark 2.1.1。我想拥有所有的 syslog 数据,例如,在 HDFS 中使用 2 周,这样我们可以做两件事:

  1. 避免 Spark/Hadoop 集群之间的通信延迟
  2. 加快机器学习工作的速度
  3. 接下来我想开始使用 Parquet 来处理我的数据,所以如果我从 ES 中提取数据,我以后可以用它做任何我想做的事情。

现在,我的问题 - 从 ES 中提取如此大量数据并将其放入 HDFS 的最佳方法是什么?我在 PySpark 中有一个执行一些基本查询的示例,但是当我尝试将整个索引(每天生成 100gb 的索引)拉入 RDD 时,我会出现内存不足错误。我已经联系了 Elasticsearch 支持,但有人告诉我这是我需要在 Hadoop/Spark 方面解决的问题,但他们不支持。

我们已经设置了“ES-Hadoop 连接器”,它确实为我提供了一些工作框架,尽管理解文档确实是一个挑战。 Hadoop 生态系统的几个组件(HIVE、Spark、Hadoop 等)都有连接器。我不确定那里是否有解决方案,或者是否有更好的事情要做。我是新手,所以请原谅任何有明显答案的问题。我正在寻找一些指导和一些具体建议(如果可能的话,指向带有设置和代码的特定示例的指针会很棒)。我的目标是:

  1. 在 HDFS 中获取大约 2 周的系统日志(我希望这是滚动的 2 周)
  2. 在 Elasticsearch 系统上创建最小负载
  3. 无论有什么方法,最好自动执行此操作,这样每天都会摄取一个新索引并删除最旧的索引。这不是一个硬性要求,而只是一个很好的要求。

感谢您提供任何帮助、建议或示例。

编辑/附加信息:

我想在这里添加一些代码来解释我想要做什么。这个过程需要很长时间才能完成,即使在几个小时后,也没有显示出任何进展,所以我想知道我是否做错了什么。

这是我启动 Py Spark 的方式:

pyspark --jars=/sysadmin/hadoop/elasticsearch-hadoop-5.6.3/dist/elasticsearch-hadoop-5.6.3.jar --master yarn --deploy-mode client --num-executors 10 --executor-cores 4 --executor-memory 8G --driver-memory 50G

然后,我做了几件事,设置 esconf,创建 RDD,然后尝试将其作为文本保存到 HDFS:

>>> esconf = {"es.net.http.auth.user":"XXXXX","es.net.http.auth.pass":"XXXXX","es.resource":"logstash-syslog-2017.10.11", "es.query":"?q=*","es.read.field.include":"message","es.nodes":"server0005","es.net.ssl":"true"}
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=esconf)
>>> rdd.saveAsTextFile("/user/spark/logstash-syslog-2017.10.11.txt") 

现在,RDD 回来了,如果我从 RDD 中执行 take(1),这需要一段时间,但我可以取回前 10 个结果。在这 10 条记录集上,我可以保存它,就像一个魅力。但是,在完整的 RDD 上,这只是永远。我不太确定我应该期待什么,但我无法想象在一个 10 节点的集群上,每个集群有 64gb 的 RAM 和 8 个内核,这需要几个小时。

【问题讨论】:

  • 您要导出数据,还是要通过 Hadoop 作业对其进行处理?如果您在 Spark 中遇到内存问题,您显然需要获得更多执行器并增加它们的内存设置。否则,使用 ES-Hadoop 你可以创建一个 Hive 表,然后从 ES 定义的表中插入到 HDFS 表中
  • 另一种方法是将系统日志数据发送到logstash,将logstash输出到HDFS和Elasticsearch
  • @cricket_007 - 我想做的是获取这些索引并将它们转储到 HDFS 中,以便我可以在那里处理数据。由于 syslog 索引代表一整天(~100gb),因此将其放入 HDFS 时将其分解为更小的分区可能是明智的,因此处理速度更快,但我不知道这是否是一个好策略。现在,我仅限于 10 个数据节点,具有 64gb 的 RAM 和 8 个核心处理器。最初有人建议我使用 HIVE,但后来告诉我如果我使用的是 Spark,我应该直接查询 ES。您对此有什么建议或建议吗?
  • 我相信我已经给了你我的建议。 Logstash 可以将新数据发送到 HDFS。 Spark、Hive、Pig 并不重要。对于 Spark,您必须增加默认的执行程序内存。 Spark 不会使用 HDFS,除非您在那里写入数据,并且它不会在 Elasticsearch 上保持低负载
  • @cricket_007 - 我正在尝试做的一个例子是 William Benton 在 2016 年 Spark 峰会上展示的内容。他表示他将所有数据从 ES 转储到 HDFS 并使用 Parquet。我想做类似的事情。 Bill Benton Talk 他并没有真正了解他是如何做到这一点的。我确实联系过他,但没有收到回音。

标签: hadoop apache-spark elasticsearch hdfs


【解决方案1】:

我在 PySpark 中有一个执行一些基本查询的示例,但是当我尝试将整个索引(每天生成 100gb 的索引)拉入 RDD 时,出现内存不足错误

默认情况下,Spark 不会为您的作业分配太多内存,所以是的,在处理这么多数据时,您会收到 OOM 错误。

以下是您应该关注的关键属性及其默认值。

  • spark.dynamicAllocation.enabled - false
  • spark.executor.instances - 2
  • spark.executor.memory - 1g
  • spark.driver.cores - 1

如果您的 Spark 作业在 YARN 集群管理下运行,您还需要考虑您的 YARN 容器大小。在集群模式下运行时,Application Master 将是 Spark 驱动程序容器。根据我的经验,除非您的 Spark 代码调用 collect() 通过驱动程序发回数据,否则它本身不需要那么多内存。

我会先尝试增加 Executor 内存,然后再增加 executor 的数量。如果启用动态分配,那么您可以考虑不指定执行者数量,但它确实设置了一个下限开始。

ES-Hadoop 提供了许多连接器来提取数据,但这一切都归结为偏好。如果您了解 SQL,请使用 Hive。 Pig 比 Spark 运行起来更简单。 Spark 的内存非常重,在某些集群中可能无法正常工作。

您在 cmets 中提到了 NiFi,但这仍然是一个 Java 进程,并且容易出现 OOM 错误。除非您有 NiFi 集群,否则在写入 HDFS 之前,您将有一个进程通过磁盘上的 FlowFile 拉取 100 GB。

如果您需要整个索引的快照,Elasticsearch 为此类功能提供了HDFS support。不过,我不确定那是什么数据格式,或者 Hadoop 进程是否可以读取它。

【讨论】:

  • 是的,Nifi 可以像 Pig、Hive 或 Spark 一样放置 ORC 或 Parquet
  • 您需要查看 Spark UI 才能知道什么需要很长时间
  • 执行者有多少?我猜只有一个? 100Gb 从 Elasticsearch,通过一个 Java 进程,然后到 HDFS 不会很快。
  • 只有一个驱动。我无法想象只给驱动程序更多的内存会有多大帮助......我没有使用 Elasticsearch Spark 连接器,所以我可能是错的,但是每个执行程序都需要运行完全相同的 Elasticsearch 查询,提取所有索引数据... Nifi 默认不分发,但请随意尝试。这与使用 Elasticsearch Java SDK 编写自己的 Java 代码没有什么不同,只是提供了一个简单的 GUI
  • 没错。在您执行此类操作之前,Spark 是懒惰的。如果你使用的是 Kafka,Spark Streaming 就可以了。你的问题没有提到
猜你喜欢
  • 1970-01-01
  • 2014-07-13
  • 2012-02-18
  • 2015-06-23
  • 1970-01-01
  • 1970-01-01
  • 2022-01-20
  • 1970-01-01
  • 2012-01-28
相关资源
最近更新 更多