【问题标题】:How to partition a single RDD into multiple RDD in spark [duplicate]如何在火花中将单个RDD划分为多个RDD [重复]
【发布时间】:2015-04-16 11:42:43
【问题描述】:

我有一个 RDD,其中每个条目都属于一个类。我想将单个 RDD 分成几个 RDD,这样一个类的所有条目都会进入一个 RDD。假设我在输入 RDD 中有 100 个这样的类,我希望每个类都成为自己的 RDD。我可以使用每个类的过滤器来做到这一点(如下所示),但它会启动几个工作。有没有更好的方法在一项工作中做到这一点?

def method(val input:RDD[LabeledPoint], val classes:List[Double]):List[RDD] = 
      classes.map{lbl=>input.filter(_.label==lbl)}

它类似于另一个问题,但我有超过 2 个课程(大约 10 个)

【问题讨论】:

  • "我希望每个类都成为自己的 RDD。"为什么?之后你将如何处理它们?
  • 嗯...创建 Spark RDD 模型时并未考虑到此类操作。但是,如果你想要这样的东西......你总是可以使用最明显的方法(就像你一样)。现在......关于能够在“单一工作”中做到这一点(单个RDD上的大多数操作确实涉及多个工作,所以我不确定你所说的“单一工作”是什么意思,但假设你的意思是O( n) 不依赖于类数量的操作)...根据当前 RDD 的哲学,我认为它“不应该”是可能的。
  • @Paul 另一种方法(StatisticsSummary)需要一个 RDD 作为输入。我想获取每个班级的汇总统计数据
  • 我不认为这真的是一个骗局,因为你想分成两个以上的 RDD。我赞成 b/c 我真的很喜欢你的解决方案!

标签: scala apache-spark


【解决方案1】:

我遇到了同样的问题,不幸的是根据我找到的不同资源没有其他方法。

问题是您需要从 RDD 开始在结果中创建实际列表,如果您查看 here,答案也表明这是不可能的。

你做的应该没问题,如果你想优化,那么如果可以的话就去缓存数据。

【讨论】:

  • 有什么办法可以改变 spark 代码来支持它。 RDD 是一组分区。并且可以将一个分区拆分为 List[List[]]。如何划分分区创建List[RDD]
  • 对 RRD 的操作返回其他 RDD。这就是 API 的定义方式。我不会反对这一点。您可能可以更改某些内容,但我认为它会破坏其他所有内容,并且会花费您很多时间,即使它有效,我不确定它是否会被接受为拉取请求。缓存数据集是你能做的最好的事情,我会说你应该做什么。您是否有理由避免这样做?
  • 接受缓存似乎有助于降低运行时间成本。谢谢
【解决方案2】:

AFAIK 这是不可能的,但你可能有一个概念问题。

鉴于您的 cmets,您可能想要使用 aggregateByKey()。无需创建一百个 RDD,只需按类键入一个并构建一个自定义聚合方法来聚合您的统计信息。 Spark 将按类分发您的实例,以便您可以独立操作它们。如果逻辑根据类而改变,你总是可以使用 if/else、开关、多态等等……

【讨论】:

  • 我想为每个类创建一个 RDD,而不是 aggregateByKey,因为这会将类的值聚合到单个分区。假设我只有 5 个类,就会有很多数据移动。我也需要这个作为另一种方法(Statistics.colStats)需要一个 RDD。所以再次聚合和创建 RDD 会很昂贵。
  • 那我猜你搞砸了,你必须通过过滤旧的 RDD 来创建每个新的 RDD :-S 但请注意 agregateByKey 首先在多个分区上单独聚合,然后才聚合中间结果(类似于 hadoop 组合器)。这就是为什么在 groupByKey() 之上推荐它的原因。根据您的版本,您可能还会查看 combineByKey() 和 reduceByKey()。也许你可以用这种方式重写你的统计数据......或者不......检查一下!
猜你喜欢
  • 1970-01-01
  • 2016-04-23
  • 2018-04-26
  • 2023-03-13
  • 2017-01-03
  • 1970-01-01
  • 1970-01-01
  • 2016-01-03
相关资源
最近更新 更多