【问题标题】:How to return several records in resulting RDD mapping one record in input RDD如何在结果RDD中返回多条记录映射输入RDD中的一条记录
【发布时间】:2017-11-07 11:27:19
【问题描述】:

在几次加入后,我得到了一个包含以下记录的 RDD:

(Int, ((Int, Option[Iterable[Int]]), Option[Iterable[Int]]))

它是:

(id_of_client, ((id_of_order, products_in_order), all_products_client_ever_bought)

我需要将其转换为 (Int, Int, Boolean):

(id_of_order, all_products_client_ever_bought._1, was_this_product_in_this_order)
(id_of_order, all_products_client_ever_bought._2, was_this_product_in_this_order)
(id_of_order, all_products_client_ever_bought._3, was_this_product_in_this_order)
...

结果 RDD 中的记录应该与输入 RDD 的所有记录的 all_products_client_ever_bought 中的项目一样多。所以我正在映射我的输入 RDD,rdd.map(transform_df(_))

def transform_df(row: (Int, ((Int, Option[Iterable[Int]]), Option[Iterable[Int]]))) = {
    //(order_id, user_product_id, if_order_contains_product)
    val order_products = row._2._1._2.get.toList
    val user_products = row._2._2.get
    for (product_id <- user_products) {
        (row._2._1._1, product_id, order_products.contains(product_id))
    }
}

因此,我得到的 RDD 与输入的长度相同,但元组为空。如何转换 RDD?

【问题讨论】:

  • 首先尝试给transform_df一个明确的返回类型。您应该看到它可能不是您期望的那样。
  • 是的,我看到它返回 Unit,但是这个函数更像是解释我想要做什么的草图。我想到我应该在几个步骤之前使用explode(stackoverflow.com/questions/32906613/flattening-rows-in-spark),然后他们进行连接,但是映射后是否有可能获得比输入更大的RDD?
  • 这就是flatMap 的用途。

标签: scala apache-spark rdd


【解决方案1】:

您是对的,您需要“爆炸”您的数据集,即将每条记录映射到超过 1 条记录。使用 RDD API,以及在大多数函数式编程语言中,您需要使用 flapMap 函数(explode 用于数据帧)。

有关如何使用平面地图的更多详细信息,请参阅map-map-and-flatmap-in-scala。基本上,对于 A 类型的每条记录,您映射一个 Seq[B] 类型的序列,然后您会得到一个 RDD[B] 类型的 RDD,其中所有内容都被展平。

Spark 中另一个非常方便的方法是 flatMapValue,它适用于 pairRDD(键值 RDD),并且只会将值展平。

在您的示例中,您可以首先将您的 RDD 映射到仅包含您需要且操作更方便的东西。

rdd.map{ case (id_of_client, ((id_of_order, products_in_order), all_products) 
           => id_of_order -> (products_in_order.get.toSet, all_products.get) }

注意顺便说一句,使用模式匹配而不是 ._1._2._2 表示法是使您的代码更具可读性的好习惯。我还将订单的产品转换为 Set,因为之后我们需要对其提出请求。

那么你只需要使用 flatMapValues 就可以得到你想要的。

.flatMapValues{ case (products_in_order, all_products) =>
        all_products.map(p => p -> product_in_order.contains(p)) }
.map { case (a,(b,c)) => (a,b,c) }

最后一行只是将结果转换为您想要的结果。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-02
    • 2021-01-04
    • 2022-01-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-10-01
    • 1970-01-01
    相关资源
    最近更新 更多