【问题标题】:spark dataframe aggregation of column based on condition in scala根据scala中的条件对列进行火花数据框聚合
【发布时间】:2020-02-03 09:44:15
【问题描述】:

我有以下格式的 csv 数据。

我需要找到 2017 年营业额超过 100 的前 2 供应商。

Turnover= Sum(Invoices which status is Paid-in-Full ) - Sum(Invoices 其状态为 Exception 或 Rejected)

我已经在 datebricks scala notebook 中加载了来自 csv 的数据,如下所示:

val invoices_data = spark.read.format(file_type)
                  .option("header", "true")
                  .option("dateFormat", "M/d/yy")
                  .option("inferSchema", "true")
                 .load("invoice.csv")

然后我尝试按供应商名称进行分组

val avg_invoice_by_vendor = invoices_data.groupBy("VendorName")

但现在我不知道该怎么做。

这是示例 csv 数据。

Id     InvoiceDate      Status         Invoice   VendorName
    2   2/23/17         Exception       23        V1
    3   11/23/17        Paid-in-Full    56        V1
    1   12/20/17        Paid-in-Full    12        V1
    5   8/4/19          Paid-in-Full    123       V2
    6   2/6/17          Paid-in-Full    237       V2
    9   3/9/17          Rejected        234       V2
    7   4/23/17         Paid-in-Full    78        V3
    8   5/23/17         Exception       345       V4

【问题讨论】:

    标签: scala dataframe apache-spark-sql azure-databricks


    【解决方案1】:

    您可以使用 udf 签署发票,具体取决于状态和使用 sum 函数对聚合 df 进行分组后:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.DateType
    def signInvoice: (String, Int) => Int = (status: String, invoice: Int) => {
      status match {
        case "Exception" | "Rejected" => -invoice
        case "Paid-in-Full" => invoice
        case _ => throw new IllegalStateException("wrong status")
      }
    }
    
    val signInvoiceUdf = spark.udf.register("signInvoice", signInvoice)
    val top2_vendorsDF = invoices_data
      .withColumn("InvoiceDate", col("InvoiceDate").cast(DateType))
      .filter(year(col("InvoiceDate")) === lit(2017))
      .withColumn("Invoice", col("Invoice").as[Int])
      .groupBy("VendorName")
      .agg(sum(signInvoiceUdf('Status, 'Invoice)).as("sum_invoice"))
      .filter(col("sum_invoice") > 100)
      .orderBy(col("sum_invoice").desc)
      .take(2)
    

    【讨论】:

      【解决方案2】:

      我已经使用pivot方法解决了上述问题。

      invoices_data
                    .filter(invoices_data("InvoiceStatusDesc") === "Paid-in-Full" || 
                      invoices_data("InvoiceStatusDesc") === "Exception" ||
                      invoices_data("InvoiceStatusDesc") === "Rejected")
                    .filter(year(to_date(invoices_data("InvoiceDate"), "M/d/yy")) === 2017)
                    .groupBy("InvoiceVendorName").pivot("InvoiceStatusDesc").sum("InvoiceTotal")
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-12-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-03-20
        • 2018-11-30
        • 2020-06-21
        • 2019-12-18
        相关资源
        最近更新 更多