【问题标题】:Apache Spark: cache and partitionsApache Spark:缓存和分区
【发布时间】:2015-07-06 04:34:40
【问题描述】:

情况如下:我有一个不断增长的数据集合,我想在 Hadoop 集群中使用 RDD 对其进行处理。

这是一个简短的例子:

val elementA = (1, Seq(2, 3))
val elementB = (2, Seq(1, 3))
val elementC = (3, Seq(1, 2))

val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)).
    map(x => (x._1, x._2)).setName("testRDD").cache()

val elementD = (4, Seq(1, 3))
val elementD1 = (1, Seq(4))
val elementD2 = (3, Seq(4))

val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)).
    map(x => (x._1, x._2)).setName("testAdd")

val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten)

结果将是这样的(元素的顺序可能不同):

(1, List(2, 3, 4))
(2, List(1, 3))
(3, List(1, 2, 4))
(4, List(1, 3))

这是我的目标:

  1. 我想.cache()我在集群内存中的 RDD。
  2. 我希望能够向现有 RDD 添加新元素。

这是我的发现:

  1. RDD 中的每个分区都单独且完全缓存(例如,我有一个包含 100 个元素和 4 个分区的集合,我调用了.cache().collect()cache().first(),在第一种情况下获得了 4 个缓存分区,在第二种情况下获得了 1 个)。
  2. testRDD.cogroup(testAdd) 的结果是新的 RDD,可以再次缓存,如果我们尝试使用var testRDD 并调用testRDD = testRDD.cogroup(testAdd),我们将丢失缓存数据的链接。
  3. 我知道,RDD 最适合批处理应用程序,我在这里有这个:每个新元素的 Seq() 将从另一个元素的属性中计算出来。

有什么方法可以修改当前的RDD而不从缓存中删除它的所有元素?

我想在临时存储达到一定限制后制作一种临时存储并将临时存储与当前存储合并...

【问题讨论】:

    标签: caching collections apache-spark rdd


    【解决方案1】:

    RDD 是不可变的,因此您不能向它们添加新元素。但是,您可以通过将原始 RDD 与新元素联合来创建新 RDD,类似于您对 testResult RDD 所做的操作。

    如果你想在新的 RDD 中使用相同的变量和更新,你可以使用 var 而不是 val 用于该 RDD。例如

    var testRDD = sc.parallelize(...) val testAdd = sc.parallelize(...) testRDD = testRDD.union(testAdd) testRDD.cache()

    这将创建一个连接两个原始 RDD 的沿袭。如果您在 testRDD 上调用 union 太多次,这可能会导致问题。要解决这个问题,您可以在 testRDD 被联合多次后调用检查点,比如每 10 次更新。您也可以考虑在检查点时在 testRDD 上调用 repartion。

    使用此技术添加到 testRDD 的所有元素都应保留在缓存中。

    【讨论】:

    • 感谢您的回答,但这是另一个问题 - 已经缓存的元素肯定会有一些变化(请参阅我的问题示例),如何使用 var 帮助我?我可以在var testRDD 上调用testRDD.cache(),它会在操作后自动缓存我的更新吗?很难相信这种魔法......
    • 我认为每次添加(联合)更多元素时都需要调用testRDD.cache(),这实际上会创建一个新的RDD。
    • 不,您不需要再次调用.cache(),它会创建新的MapPartitionRDD在我的情况下)。但似乎您可以为您的var testRDD 拨打一次.cache()
    • 好的,很高兴知道。就像我在回答中所说的那样,请注意我们的血统问题,并考虑每隔多次迭代调用 testRDD.checkpoint()testRDD.repartition(x)
    • 是的,这是真的。另一种方法是保留对旧 RDD 的引用并在联合后调用 unpersist,以便在联合时 RDD 在缓存中。例如val oldTestRDD = testRDD
    猜你喜欢
    • 2020-04-21
    • 2020-07-30
    • 2016-08-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-26
    • 1970-01-01
    相关资源
    最近更新 更多