【问题标题】:while loop on a map spark dataframe地图火花数据帧上的while循环
【发布时间】:2019-05-02 21:59:14
【问题描述】:

我在使用 scala API 的 spark 中遇到了 ma​​pType 的问题 对于每个会话,我们都会发送一个地图,您可以在其中找到用户访问的类别与每个类别中的事件数量相关联

[ home & personal items > interior -> 1, vehicles > cars -> 1] 

并非所有用户访问的类别数量都相同,因此地图的大小会根据 user_id 发生变化

我需要计算按类别分组的会话数 为了做到这一点,我需要遍历地图,而它不是空的 我之前尝试过的事情

while (size(col("categoriesRaw")) !== 0) {
    df.select(
        explode(col("categoriesRaw"))
    )
    .select(
        col("key").alias("categ"),
        col("value").alias("number_of_events")
    )
}

但我遇到了一些错误,例如:

type mismatch;
 found   : org.apache.spark.sql.Column
 required: Booleansbt

【问题讨论】:

  • 你能分享示例数据框吗?
  • @Kaushal 类似:StructField("sessionId", StringType, true), StructField("categoriesRaw", MapType(StringType, IntegerType, true), true),
  • 你能分享一个示例数据字段吗?
  • @YayatiSule 类似于 [ 家居和个人物品 > 内饰 -> 1,车辆 > 汽车 -> 1] [ 车辆 > 汽车 -> 3]
  • 您的原始数据是 JSON 格式吗?像这里 [{"home_and_personal":{"interior":1},"vehicles":{"cars":1}}]

标签: scala apache-spark dataframe


【解决方案1】:

我不确定你想用 while 循环做什么。无论如何,您可以使用 REPL 检查您用作条件的表达式是 Column 而不是 Boolean,因此是异常。

> size(col("categoriesRaw")) !== 0
res1: org.apache.spark.sql.Column = (NOT (size(categoriesRaw) = 0))

基本上,这是一个需要由 SparkSQL 在 whereselect 或任何其他使用列的函数中计算的表达式。

尽管如此,您的 spark 代码已经快到了,您只需添加 groupBy 即可到达您想要的位置。让我们从创建数据开始。

import spark.implicits._
val users = Seq( "user 1" -> Map("home & personal items > interior" -> 1,
                                 "vehicles > cars" -> 1), 
                 "user 2" -> Map("vehicles > cars" -> 3)) 
val df = users.toDF("user", "categoriesRaw")

然后,您不需要 while 循环来遍历映射的所有值。 explode 正是为你做的:

val explodedDf = df.select( explode('categoriesRaw) )
explodedDf.show(false)

+--------------------------------+-----+
|key                             |value|
+--------------------------------+-----+
|home & personal items > interior|1    |        
|vehicles > cars                 |1    |
|vehicles > cars                 |3    |
+--------------------------------+-----+ 

最后,你可以使用 groupBy add 得到你想要的。

explodedDf
    .select('key as "categ", 'value as "number_of_events")
    .groupBy("categ")
    .agg(count('*), sum('number_of_events))
    .show(false)

+--------------------------------+--------+---------------------+
|categ                           |count(1)|sum(number_of_events)|
+--------------------------------+--------+---------------------+
|home & personal items > interior|1       |1                    |
|vehicles > cars                 |2       |4                    |
+--------------------------------+--------+---------------------+

注意:我不确定您是要计算会话(第一列)还是事件(第二列),所以我计算了两者。

【讨论】:

  • 好的,让我向你解释一下 myslef。我正在尝试执行while循环,因为当我执行explode函数然后选择(键)并对其进行赋值时,它只取第一个(键,值)而不是全部......也许每个Rawcategory的大小从a改变用户到另一个
  • 如果您能提供样本数据和预期的输出,这将大有帮助。这样我们就可以更好地了解您要宣传的内容。
  • 输入:StructField("sessionId", StringType, true), StructField("categoriesRaw", MapType(StringType, IntegerType, true), true),地图是这样的:user 1 [ home & personal项目 > 内饰 -> 1,车辆 > 汽车 -> 1] 用户 2:[ 车辆 > 汽车 -> 3]
  • 我想做一个循环,只要不是 empty(因为我们不知道每张地图的确切大小)和 爆炸该地图的所有关键值(因为如果我不做一个循环它只取第一个关键值)
  • 你能在你的问题中添加所有这些信息吗?它将帮助遇到类似问题的其他人更轻松地找到解决方案。想要帮助您的人可以将所有信息集中在一处。
猜你喜欢
  • 2016-05-06
  • 2020-12-17
  • 1970-01-01
  • 2020-08-09
  • 2018-02-15
  • 2019-04-12
  • 2016-05-01
  • 2021-04-22
  • 2018-10-27
相关资源
最近更新 更多