【问题标题】:Spark table transformation (ERROR: 5063)Spark 表转换(错误:5063)
【发布时间】:2016-01-14 03:39:43
【问题描述】:

我有以下数据:

val RDDApp = sc.parallelize(List("A", "B", "C"))
val RDDUser = sc.parallelize(List(1, 2, 3))
val RDDInstalled =  sc.parallelize(List((1, "A"), (1, "B"), (2, "B"), (2,  "C"), (3, "A"))).groupByKey
val RDDCart = RDDUser.cartesian(RDDApp)

我想映射这些数据,以便我有一个带有 (userId, Boolean 如果给用户的字母) 的元组 RDD。我以为我找到了解决方案:

val results = RDDCart.map (entry =>
   (entry._1, RDDInstalled.lookup(entry._1).contains(entry._2))
)

如果我打电话给results.first,我会得到org.apache.spark.SparkException: SPARK-5063。我在 Mapping 函数中看到了 Action 的问题,但不知道如何解决它以获得相同的结果。

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    只需joinmapValues

    RDDCart.join(RDDInstalled).mapValues{case (x, xs) => xs.toSeq.contains(x)}
    

    【讨论】:

    • 如果数据集非常大,是否有替代方法?
    • 我不确定我是否理解所有逻辑。特别是笛卡尔需要创建RDDCart 闻起来很腥。但是,如果两个数据集都无法放入内存,则 join 本身可能是唯一有效的 exact 解决方案。
    • 感谢您的建议。我有点猜测这是创建结果的最佳逻辑,但如果有其他解决方案 - 不使用笛卡尔函数的解决方案 - 我会很高兴了解它们。并不是数据集太大,而是对两个数据集执行join 需要很长时间。
    • 你知道,从技术上讲,RDDInstalled 完全定义了用户-应用关系的稀疏矩阵。这就是为什么我在其他任何事情上都看不到重点。如果RDDInstalled 足够小,可以在本地处理(可以说少于几 GB 左右,具体取决于您的配置),您可以收集和广播。另一种选择是在其上创建布隆过滤器并将其传递给地图。
    猜你喜欢
    • 1970-01-01
    • 2015-07-01
    • 2018-04-24
    • 2015-11-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多