【发布时间】:2017-11-07 11:27:19
【问题描述】:
在几次加入后,我得到了一个包含以下记录的 RDD:
(Int, ((Int, Option[Iterable[Int]]), Option[Iterable[Int]]))
它是:
(id_of_client, ((id_of_order, products_in_order), all_products_client_ever_bought)
我需要将其转换为 (Int, Int, Boolean):
(id_of_order, all_products_client_ever_bought._1, was_this_product_in_this_order)
(id_of_order, all_products_client_ever_bought._2, was_this_product_in_this_order)
(id_of_order, all_products_client_ever_bought._3, was_this_product_in_this_order)
...
结果 RDD 中的记录应该与输入 RDD 的所有记录的 all_products_client_ever_bought 中的项目一样多。所以我正在映射我的输入 RDD,rdd.map(transform_df(_))
def transform_df(row: (Int, ((Int, Option[Iterable[Int]]), Option[Iterable[Int]]))) = {
//(order_id, user_product_id, if_order_contains_product)
val order_products = row._2._1._2.get.toList
val user_products = row._2._2.get
for (product_id <- user_products) {
(row._2._1._1, product_id, order_products.contains(product_id))
}
}
因此,我得到的 RDD 与输入的长度相同,但元组为空。如何转换 RDD?
【问题讨论】:
-
首先尝试给
transform_df一个明确的返回类型。您应该看到它可能不是您期望的那样。 -
是的,我看到它返回 Unit,但是这个函数更像是解释我想要做什么的草图。我想到我应该在几个步骤之前使用explode(stackoverflow.com/questions/32906613/flattening-rows-in-spark),然后他们进行连接,但是映射后是否有可能获得比输入更大的RDD?
-
这就是
flatMap的用途。
标签: scala apache-spark rdd