【问题标题】:Spark dataframes: Extract a column based on the value of another columnSpark数据框:根据另一列的值提取一列
【发布时间】:2016-02-11 19:44:25
【问题描述】:

我有一个数据框,其中包含带有已连接价目表的交易:

+----------+----------+------+-------+-------+
|   paid   | currency | EUR  |  USD  |  GBP  |
+----------+----------+------+-------+-------+
|   49.5   |   EUR    | 99   |  79   |  69   |
+----------+----------+------+-------+-------+

客户支付了 49.5 欧元,如“货币”列所示。我现在想将支付的价格与价目表中的价格进行比较。

因此,我需要根据“货币”的值访问正确的列,如下所示:

df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid"))

我希望会变成这样

df.withColumn("saved", df.col("EUR") - df.col("paid"))

但是,这失败了。我尝试了所有我能想象到的东西,包括 UDF,却一无所获。

我想有一些优雅的解决方案吗?有人可以帮忙吗?

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    我想不出用DataFrames 做这件事的方法,我怀疑是否有简单的方法,但如果你把那个表放到RDD

    // On top of my head, warn if wrong.
    // Would be more elegant with match .. case 
    def d(l: (Int, String, Int, Int, Int)): Int = {
      if(l._2 == "EUR")
        l._3 - l._1
      else if (l._2 == "USD")
        l._4 - l._1
      else 
        l._5 -l._1
    }
    val rdd = df.rdd
    val diff = rdd.map(r => (r, r(d)))
    

    很可能会引发类型错误,我希望您能解决这些问题。

    【讨论】:

    • 谢谢!还有几种货币,所以我想避免 if/else 或嵌套的 when().otherwise() 构造。
    • 说到这一点,我的问题似乎是我无法获得该列 $"currency" 的字面值,我想知道 when($"column", [then]) 是如何工作的。我想知道代码there 的部分是否对我有帮助?! lit(condition).expr
    【解决方案2】:

    假设列名与currency 列中的值匹配:

    import org.apache.spark.sql.functions.{lit, col, coalesce}
    import org.apache.spark.sql.Column 
    
    // Dummy data
    val df = sc.parallelize(Seq(
      (49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
    )).toDF("paid", "currency", "EUR", "USD", "GBP")
    
    // A list of available currencies 
    val currencies: List[String] = List("EUR", "USD", "GBP")
    
    // Select listed value
    val listedPrice: Column = coalesce(
      currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)
    
    df.select($"*", (listedPrice - $"paid").alias("difference")).show
    
    // +-----+--------+---+---+---+----------+
    // | paid|currency|EUR|USD|GBP|difference|
    // +-----+--------+---+---+---+----------+
    // | 49.5|     EUR| 99| 79| 69|      49.5|
    // |100.0|     GBP| 80|120| 50|     -50.0|
    // +-----+--------+---+---+---+----------+
    

    listedPrice 的 SQL 等效表达式如下:

    COALESCE(
      CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
      CASE WHEN (currency = 'USD') THEN USD ELSE null,
      CASE WHEN (currency = 'GBP') THEN GBP ELSE null
    )
    

    替代使用foldLeft:

    import org.apache.spark.sql.functions.when
    
    val listedPriceViaFold = currencies.foldLeft(
      lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))
    
    df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show
    
    // +-----+--------+---+---+---+----------+
    // | paid|currency|EUR|USD|GBP|difference|
    // +-----+--------+---+---+---+----------+
    // | 49.5|     EUR| 99| 79| 69|      49.5|
    // |100.0|     GBP| 80|120| 50|     -50.0|
    // +-----+--------+---+---+---+----------+
    

    listedPriceViaFold 转换为以下 SQL:

    CASE
      WHEN (currency = 'GBP') THEN GBP
      ELSE CASE
        WHEN (currency = 'USD') THEN USD
        ELSE CASE
          WHEN (currency = 'EUR') THEN EUR
          ELSE null
    

    不幸的是,我不知道任何可以像这样直接表达 SQL 的内置函数

    CASE currency
        WHEN 'EUR' THEN EUR
        WHEN 'USD' THEN USD
        WHEN 'GBP' THEN GBP
        ELSE null
    END
    

    但您可以在原始 SQL 中使用此构造。

    我的假设不正确,您可以简单地在列名和 currency 列中的值之间添加映射。

    编辑

    如果源支持谓词下推和有效的列修剪,另一个可能有效的选项是按货币和联合子集:

    currencies.map(
      // for each currency filter and add difference
      c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
    ).reduce((df1, df2) => df1.unionAll(df2)) // Union
    

    相当于这样的SQL:

    SELECT *,  EUR - paid AS difference FROM df WHERE currency = 'EUR'
    UNION ALL
    SELECT *,  USD - paid AS difference FROM df WHERE currency = 'USD'
    UNION ALL
    SELECT *,  GBP - paid AS difference FROM df WHERE currency = 'GBP'
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-08-09
    • 2017-11-11
    • 1970-01-01
    • 1970-01-01
    • 2023-02-18
    • 2015-08-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多