【问题标题】:Spark - How to keep max limit on number of values grouped in JavaPairRDDSpark - 如何保持 JavaPairRDD 中分组值的最大数量限制
【发布时间】:2018-06-30 14:58:36
【问题描述】:

我有一个这样的 RDD:

JavaPairRDD<String, String> 

有很多条目,并且一些键重复了很多次。当我应用groupByKeycombineByKey 时,它会生成另一个

JavaPairRDD<String, Iterable<String>

这就是问题所在,对于某些键集,值的数量非常庞大(因为特定键是倾斜的)。这会导致进一步的下游消费出现问题,甚至会产生内存问题。

我的问题是如何限制每个键聚合的值的数量。我想按键分组,但是值列表不应超过限制 X 数。任何溢出的值都应该添加到新行,有没有办法做到这一点?

【问题讨论】:

  • 听起来你可以在 rdd 上使用map 并删除列表太长的值。
  • 我不想失去价值。我只想将其保留为具有相同键和其余值(溢出值)的新条目。
  • 这样的话flatMap应该更合适。
  • 如果你愿意,我可以使用 Scala(你可以转换为 Java)给你一个答案。
  • 我不确定如何拆分值并创建两个相同键的条目。如果你放弃 scala,我会得到一些想法。谢谢。

标签: java apache-spark bigdata rdd


【解决方案1】:

这可以使用flatMap 解决。我不确定如何用 Java 编写它,但是,希望您可以转换 Scala 代码。带有示例输入的代码:

val rdd = spark.sparkContext.parallelize(Seq((1, Iterable(1,2,3,4,5)), (2, Iterable(6,7,8)), (3, Iterable(1,3,6,8,4,2,7,8,3))))

val maxLength = 3
val res = rdd.flatMap{ case(id, vals) =>
  vals.grouped(maxLength).map(v => (id, v))
}

这个想法是将列表拆分为一个列表列表,其中每个内部列表都有一个最大长度。因为这里使用了flatMap,所以列表列表将被展平为一个简单的列表,这是您想要的结果。使用最大长度 3 并打印 res 给出:

(1,List(1, 2, 3))
(1,List(4, 5))
(2,List(6, 7, 8))
(3,List(1, 3, 6))
(3,List(8, 4, 2))
(3,List(7, 8, 3))

【讨论】:

  • 谢谢,这很有用!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-10
  • 1970-01-01
  • 2019-07-23
  • 2017-09-21
  • 2017-12-27
  • 2020-02-20
相关资源
最近更新 更多