【问题标题】:Count of values in a row in spark dataframe using scala使用scala在spark数据帧中的一行中的值计数
【发布时间】:2020-11-06 15:14:21
【问题描述】:

我有一个数据框。它包含跨不同销售网点的不同商品的销售额。下面显示的数据框仅显示了少数几个销售网点的少数项目。每件商品每天销售 100 件商品的基准。对于每件售出超过 100 件的商品,标记为“是”,低于 100 件的商品被标记为“否”

val df1 = Seq(
("Mumbai", 90,  109, , 101, 78, ............., "No", "Yes", "Yes", "No", .....),
("Singapore", 149,  129, , 201, 107, ............., "Yes", "Yes", "Yes", "Yes", .....),
("Hawaii", 127,  101, , 98, 109, ............., "Yes", "Yes", "No", "Yes", .....),
("New York", 146,  130, , 173, 117, ............., "Yes", "Yes", "Yes", "Yes", .....),
("Los Angeles", 94,  99, , 95, 113, ............., "No", "No", "No", "Yes", .....),
("Dubai", 201,  229, , 265, 317, ............., "Yes", "Yes", "Yes", "Yes", .....),
("Bangalore", 56,  89, , 61, 77, ............., "No", "No", "No", "No", .....))
.toDF("Outlet","Boys_Toys","Girls_Toys","Men_Shoes","Ladies_shoes", ............., "BT>100", "GT>100", "MS>100", "LS>100", .....)

现在,我想添加一列“Count_of_Yes”,其中对于每个销售网点(每一行),“Count_of_Yes”列的值将是该行中“Yes”的总数。如何遍历每一行以获得 Yes 的计数?

我预期的数据框应该是

val output_df = Seq(
("Mumbai", 90,  109, , 101, 78, ............., "No", "Yes", "Yes", "No", ....., 2),
("Singapore", 149,  129, , 201, 107, ............., "Yes", "Yes", "Yes", "Yes", ....., 4),
("Hawaii", 127,  101, , 98, 109, ............., "Yes", "Yes", "No", "Yes", ....., 3),
("New York", 146,  130, , 173, 117, ............., "Yes", "Yes", "Yes", "Yes", ....., 4),
("Los Angeles", 94,  99, , 95, 113, ............., "No", "No", "No", "Yes", ....., 1),
("Dubai", 201,  229, , 265, 317, ............., "Yes", "Yes", "Yes", "Yes", ....., 4),
("Bangalore", 56,  89, , 61, 77, ............., "No", "No", "No", "No", ....., 0))
.toDF("Outlet","Boys_Toys","Girls_Toys","Men_Shoes","Ladies_shoes", ............., "BT>100", "GT>100", "MS>100", "LS>100", ....., "Count_of_Yes")

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以将选定的列列表转换为1s(表示“是”)和0s(表示“否”)的Array,并在SQL表达式中将数组元素与aggregate相加使用selectExpr,如下图:

    val df = Seq(
      (1, 120, 80, 150, "Y", "N", "Y"),
      (2, 50, 90, 110, "N", "N", "Y"),
      (3, 70, 160, 90, "N", "Y", "N")
    ).toDF("id", "qty_a", "qty_b", "qty_c", "over100_a", "over100_b", "over100_c")
    
    val cols = df.columns.filter(_.startsWith("over100_"))
    
    df.
      withColumn("arr", array(cols.map(c => when(col(c) === "Y", 1).otherwise(0)): _*)).
      selectExpr("*", "aggregate(arr, 0, (acc, x) -> acc + x) as yes_count").
      show
    // +---+-----+-----+-----+---------+---------+---------+---------+---------+
    // | id|qty_a|qty_b|qty_c|over100_a|over100_b|over100_c|      arr|yes_count|
    // +---+-----+-----+-----+---------+---------+---------+---------+---------+
    // |  1|  120|   80|  150|        Y|        N|        Y|[1, 0, 1]|        2|
    // |  2|   50|   90|  110|        N|        N|        Y|[0, 0, 1]|        1|
    // |  3|   70|  160|   90|        N|        Y|        N|[0, 1, 0]|        1|
    // +---+-----+-----+-----+---------+---------+---------+---------+---------+
    

    或者,使用explodegroupBy/aggArray 元素求和:

    df.
      withColumn("arr", array(cols.map(c => when(col(c) === "Y", 1).otherwise(0)): _*)).
      withColumn("flattened", explode($"arr")).
      groupBy("id").agg(sum($"flattened").as("yes_count"))
    

    【讨论】:

    • Leo C,感谢您的精彩回答。它解决了问题。
    【解决方案2】:

    如何遍历每一行以获得是的计数? 您可以使用映射转换来转换每条记录。所以在你的情况下 df.map() 应该有代码来计算 YES 的数量并发出一个新的记录,它有这个额外的列。

    伪代码如下-

    df.map(count number of YES and append that at the end of the string")
    

    【讨论】:

    • 谢谢阿米特。您介意共享代码来实现这一目标吗?我没有完全掌握这个概念。期待你的回应。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-07
    • 2016-05-30
    • 2017-09-16
    • 2017-08-11
    相关资源
    最近更新 更多