【发布时间】:2015-07-06 04:34:40
【问题描述】:
情况如下:我有一个不断增长的数据集合,我想在 Hadoop 集群中使用 RDD 对其进行处理。
这是一个简短的例子:
val elementA = (1, Seq(2, 3))
val elementB = (2, Seq(1, 3))
val elementC = (3, Seq(1, 2))
val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)).
map(x => (x._1, x._2)).setName("testRDD").cache()
val elementD = (4, Seq(1, 3))
val elementD1 = (1, Seq(4))
val elementD2 = (3, Seq(4))
val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)).
map(x => (x._1, x._2)).setName("testAdd")
val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten)
结果将是这样的(元素的顺序可能不同):
(1, List(2, 3, 4))
(2, List(1, 3))
(3, List(1, 2, 4))
(4, List(1, 3))
这是我的目标:
- 我想
.cache()我在集群内存中的 RDD。 - 我希望能够向现有 RDD 添加新元素。
这是我的发现:
- RDD 中的每个分区都单独且完全缓存(例如,我有一个包含 100 个元素和 4 个分区的集合,我调用了
.cache().collect()和cache().first(),在第一种情况下获得了 4 个缓存分区,在第二种情况下获得了 1 个)。 -
testRDD.cogroup(testAdd)的结果是新的 RDD,可以再次缓存,如果我们尝试使用var testRDD并调用testRDD = testRDD.cogroup(testAdd),我们将丢失缓存数据的链接。 - 我知道,RDD 最适合批处理应用程序,我在这里有这个:每个新元素的
Seq()将从另一个元素的属性中计算出来。
有什么方法可以修改当前的RDD而不从缓存中删除它的所有元素?
我想在临时存储达到一定限制后制作一种临时存储并将临时存储与当前存储合并...
【问题讨论】:
标签: caching collections apache-spark rdd