【发布时间】:2014-09-22 15:33:43
【问题描述】:
我正在使用 Spark 1.0.1 处理大量数据。每行包含一个 ID 号,其中一些具有重复的 ID。我想将具有相同 ID 号的所有行保存在同一位置,但我无法有效地执行此操作。我创建了一个(ID 号,数据行)对的 RDD[(String, String)]:
val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}
一种可行但不高效的方法是收集 ID 号,过滤每个 ID 的 RDD,并将具有相同 ID 的值的 RDD 保存为文本文件。
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
我还尝试了 groupByKey 或 reduceByKey,这样 RDD 中的每个元组都包含一个唯一的 ID 号作为键,以及由该 ID 号的新行分隔的一串组合数据行。我只想使用 foreach 遍历 RDD 一次来保存数据,但它不能将值作为 RDD 给出
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
本质上,我想通过 ID 号将一个 RDD 拆分为多个 RDD,并将该 ID 号的值保存到它们自己的位置。
【问题讨论】:
-
按ID分组后保存文件有什么问题,它们不一定在单独的文件中,但它们不会在文件之间拆分,您可以控制您的分区数create 应该对应于创建的文件数
-
@aaronman 这不起作用,因为我需要拆分原始数据源并根据 ID 号将数据存储在不同的位置。最终会根据id号按需请求数据,是一个非常大的数据集。
-
如果你按照我建议的方式保存它,RDD肯定可以重新读取数据并通过用户ID获取数据,这是一个可以接受的解决方案
-
几天前我不得不执行同样的操作并遇到了和你一样的问题。据我所知,没有办法对 RDD 进行分组,然后保留该分组的值而不将它们放入内存中给驱动程序。你考虑过邮件列表吗?如果您发现了什么,请更新此问题,以便我们获取详细信息。
-
@jhappoldt 这绝对不是我想我会回答这个问题的情况
标签: apache-spark filter rdd