【问题标题】:Column Name inside column of dataframe in spark with scala带有scala的spark中数据框列内的列名
【发布时间】:2020-08-02 09:50:14
【问题描述】:

我正在使用 Spark 和 Scala。 2.4.3

我的销售员数据框看起来像:总共有 54 销售员,我只举了 3 列的例子

Schema of SalesPerson table.
root
 |-- col: struct (nullable = false)
 |    |-- SalesPerson_1: string (nullable = true)
 |    |-- SalesPerson_2: string (nullable = true)
 |    |-- SalesPerson_3: string (nullable = true)

销售员视图的数据。

     SalesPerson_1|SalesPerson_2|SalesPerson_3
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    [Customer_1793,  Customer_202,  Customer_2461]
    [Customer_2424, Customer_130, Customer_787]
    [Customer_1061, Customer_318, Customer_706]
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

我的 salesplace 数据框看起来像

Schema of salesplace
 
 root
 |-- Place: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesplace
Place|Customer
Online| Customer_1793
Retail| Customer_1793
Retail| Customer_130
Online| Customer_130
Online| Customer_2461
Retail| Customer_2461
Online| Customer_2461

我正在尝试检查 Salesperson 表中的哪些客户在 SalesPlace 表中可用。 有两个additional column shows customer belong to salesperson

以及在 SalesPlace 表中出现的客户计数,用于

预期输出:

CustomerBelongstoSalesperson|Customer     |occurance|
SalesPerson_1               |Customer_1793|2
SalesPerson_2               |Customer_130 |2 
SalesPerson_3               |Customer_2461|3
SalesPerson_2               |Customer_202 |0
SalesPerson_1               |Customer_2424|0
SalesPerson_1               |Customer_1061|0
SalesPerson_2               |Customer_318 |0
SalesPerson_3               |Customer_787 |0

代码:

Error:
The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 54 aliases but got Salesperson,Customer ;

在 spark 中似乎没有那么重要。 我不确定是否可以将列名作为值带入列中...... 可能有人请帮我一些想法如何做到这一点............ 谢谢

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    试试这个-

    加载提供的测试数据

     val data1 =
          """
            |salesperson1          |  salesperson2
            |Customer_17         |Customer_202
            |Customer_24         |Customer_130
          """.stripMargin
        val stringDS1 = data1.split(System.lineSeparator())
          .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
          .toSeq.toDS()
        val df1 = spark.read
          .option("sep", ",")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("nullValue", "null")
          .csv(stringDS1)
        df1.show(false)
        df1.printSchema()
        /**
          * +------------+------------+
          * |salesperson1|salesperson2|
          * +------------+------------+
          * |Customer_17 |Customer_202|
          * |Customer_24 |Customer_130|
          * +------------+------------+
          *
          * root
          * |-- salesperson1: string (nullable = true)
          * |-- salesperson2: string (nullable = true)
          */
    
        val data2 =
          """
            |Place  |Customer
            |shop  |Customer_17
            |Home  |Customer_17
            |shop  |Customer_17
            |Home  |Customer_130
            |Shop  |Customer_202
          """.stripMargin
        val stringDS2 = data2.split(System.lineSeparator())
          .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
          .toSeq.toDS()
        val df2 = spark.read
          .option("sep", ",")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("nullValue", "null")
          .csv(stringDS2)
        df2.show(false)
        df2.printSchema()
        /**
          * +-----+------------+
          * |Place|Customer    |
          * +-----+------------+
          * |shop |Customer_17 |
          * |Home |Customer_17 |
          * |shop |Customer_17 |
          * |Home |Customer_130|
          * |Shop |Customer_202|
          * +-----+------------+
          *
          * root
          * |-- Place: string (nullable = true)
          * |-- Customer: string (nullable = true)
          */
    

    Unpivotleft join

      val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
        val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
        processedDF.show(false)
        /**
          * +------------+------------+
          * |Salesperson |Customer    |
          * +------------+------------+
          * |salesperson1|Customer_17 |
          * |salesperson2|Customer_202|
          * |salesperson1|Customer_24 |
          * |salesperson2|Customer_130|
          * +------------+------------+
          */
    
        processedDF.join(df2, Seq("Customer"), "left")
          .groupBy("Customer")
          .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
          .show(false)
    
        /**
          * +------------+---------+------------+
          * |Customer    |Occurance|Salesperson |
          * +------------+---------+------------+
          * |Customer_130|1        |salesperson2|
          * |Customer_17 |3        |salesperson1|
          * |Customer_202|1        |salesperson2|
          * |Customer_24 |0        |salesperson1|
          * +------------+---------+------------+
          */
    

    【讨论】:

    • 非常感谢......但它没有正确计算出现次数。我所有的数据都在表格视图中......请提出建议。偶尔会显示为 0。
    • 我觉得join有问题。检查processedDF.join(df2, Seq("Customer"), "left")的输出
    • 我没有在答案的任何地方使用 split 。给定SalesPerson 作为df1 和salesPlace 作为df2,可以使用我的答案的Unpivot and left join 部分来实现答案
    • 在销售人员列中,它的价值显示为 col.... col....col.......而不是 Salesperson1...2 等...。请分享您的想法.
    • 您是否使用与问题中给出的相同列名的相同数据集?如果是,它应该工作。如果有任何变化,请更新描述
    猜你喜欢
    • 2020-08-25
    • 1970-01-01
    • 2017-08-08
    • 2018-08-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-06
    • 2021-09-02
    相关资源
    最近更新 更多