【问题标题】:How to create RDD inside map function如何在地图函数中创建RDD
【发布时间】:2026-01-13 13:25:01
【问题描述】:

我有键/值对的 RDD,对于每个键,我需要调用一些接受 RDD 的函数。所以我尝试了 RDD.Map 并在 map 内部使用 sc.parallelize(value) 方法创建了 RDD 并将这个 rdd 发送到我的函数,但是由于 Spark 不支持在 RDD 中创建 RDD,所以这是行不通的。

你能建议我解决这种情况吗?

我正在寻找下面线程中建议的解决方案,但我遇到的问题是我的钥匙没有固定,我可以拥有任意数量的钥匙。
How to create RDD from within Task?

谢谢

【问题讨论】:

  • 没有通用的解决方案。不能从地图调用 RDD。如果您提供一些代码与您的逻辑,可能会建议一个合适的更改。
  • 这听起来像XY problem。为什么你在自己的函数逻辑中依赖RDD
  • 真正想用 Spark 实现什么?您能否描述一下您的用例(而不是您如何尝试使用 Spark)?

标签: apache-spark


【解决方案1】:

听起来不太对劲。如果函数需要处理键值对,它应该接收键值对作为参数,而不是 RDD。

但如果你真的想将 RDD 作为参数发送,而不是在链操作内部,你可以在预处理后创建一个引用并将该引用发送给方法。

【讨论】:

    【解决方案2】:

    不,你不应该在 RDD 中创建 RDD。

    根据您的数据大小,可能有两种解决方案:

    1) 如果有很多键并且每个键没有太多的值。将接受 RDD 的函数转换为接受 Iterable 的函数。然后你可以做一些类似的事情

    // rdd: RDD[(keyType, valueType)]
    rdd.groupByKey()
      .map { case (key, values) =>
        func(values)
      }
    

    2) 如果键很少,每个键都有很多值。然后你不应该做一个组,因为它会收集一个执行程序的键的所有值,这可能会导致 OutOfMemory。相反,为每个键运行一个作业,例如

    rdd.keys.distinct().collect()
      .foreach { key =>
        func(rdd.filter(_._1 == key))         
      }
    

    【讨论】: