【问题标题】:how to repartition rdd based on data size如何根据数据大小重新分区rdd
【发布时间】:2018-11-11 22:06:06
【问题描述】:

我正在研究火花流项目,该项目从 Kafka 获取数据应用了一些规则并将数据保存在 Hive 中。我的问题是数据的摄取率不固定。 60 秒可能是 100 万条消息,也可能是 1 条。我想在 Dstream 上添加重新分区。因为 Dstream 只有 3 个分区,无法在一分钟内处理数百万条记录。重新分区在少于 20 条记录时给出问题。它在 Hive 中创建多个小文件。

dataStream.map(_._2).repartition(20)

我的问题是如何根据 rdd 大小进行 rdd 重新分区。这样它就可以处理一条消息或一百万条消息。

【问题讨论】:

  • 也许你应该重新考虑你的全球管道——例如让 Spark 将其文件转储到充当“着陆区”的 Hive 分区中,然后定期将数据合并并移动到另一个分区(将 LANDING 分区重命名为 TEMP,重新创建 LANDING,INSERT...SELECT从 TEMP 到目的地,删除 TEMP - 并确保在运行重组时没有选择该表)。

标签: apache-spark hadoop apache-kafka spark-streaming rdd


【解决方案1】:

你不能以任何有用的方式做到这一点。即使你决定使用transform

 stream.transform { rdd => {
   val n = rdd.count
   rdd.repartition(getNumParttitions(n))
 }}

这超出了操作的全部目的,因为您必须在重新分区之前扫描所有数据,并且初始分配仍然是瓶颈。

相反,我建议基于 spark.streaming.kafka.maxRatePerPartition(旧 API)或配置背压(spark.streaming.backpressure.enabledspark.streaming.backpressure.initialRate、较新 API)进行适当配置

【讨论】:

  • Dstream 包含有很多记录的文件。我使用了 spark.streaming.kafka.maxRatePerPartition=1 属性,但有时 spark 无法处理 900 个文件。批量大小为 300 秒,kafka 有 3 个分区。所以我想重新分区rdd
猜你喜欢
  • 1970-01-01
  • 2017-08-08
  • 1970-01-01
  • 2019-07-01
  • 1970-01-01
  • 2016-02-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多