【问题标题】:Update column value from another columns based on multiple conditions in spark structured streaming基于火花结构化流中的多个条件从另一列更新列值
【发布时间】:2018-12-13 19:09:15
【问题描述】:

我想根据多个条件使用另外两列更新一列中的值。例如 - 流就像:

    +---+---+----+---+
    | A | B | C  | D |
    +---+---+----+---+
    | a | T | 10 | 0 |
    | a | T | 100| 0 |
    | a | L | 0  | 0 |
    | a | L | 1  | 0 |
    +---+---+----+---+

我所拥有的是多个条件,例如 -

(B = "T" && C > 20 ) OR (B = "L" && C = 0)

"T"20"L"0 是动态的。 AND/OR 运算符也在运行时提供。只要条件成立,我就想使D = 1,否则它应该保持D = 0。条件的数量也是动态的。

我尝试将它与spark-sql 中的UPDATE 命令一起使用,即UPDATE df SET D = '1' WHERE CONDITIONS。但它表示尚不支持更新。生成的数据框应该是 -

+---+---+----+---+
| A | B | C  | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 1 |
| a | L | 0  | 1 |
| a | L | 1  | 0 |
+---+---+----+---+

有什么方法可以实现吗?

【问题讨论】:

    标签: scala apache-spark-sql spark-streaming multiple-conditions


    【解决方案1】:

    我希望你正在使用 Python。也将为 Scala 发布相同的内容!使用udf

    PYTHON

    >>> df.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  0|
    |  a|  L|  0|  0|
    |  a|  L|  1|  0|
    +---+---+---+---+
    
    >>> def get_column(B, C):
    ...     return int((B == "T" and C > 20) or (B == "L" and C == 0))
    ...
    >>> fun = udf(get_column)
    >>> res = df.withColumn("D", fun(df['B'], df['C']))>>> res.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  1|
    |  a|  L|  0|  1|
    |  a|  L|  1|  0|
    +---+---+---+---+
    

    SCALA

    scala> import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    
    scala> df.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  0|
    |  a|  L|  0|  0|
    |  a|  L|  1|  0|
    +---+---+---+---+
    
    
    scala> def get_column(B : String, C : Int) : Int = {     
         |     if((B == "T" && C > 20) || (B == "L" && C == 0))
         |         1     
         |     else
         |         0
         | }
    get_column: (B: String, C: Int)Int
    
    scala> val fun = udf(get_column _)
    fun: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, IntegerType)
    ))
    
    scala> val res = df.withColumn("D", fun(df("B"), df("C")))
    res: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more fields]
    
    scala> res.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  1|
    |  a|  L|  0|  1|
    |  a|  L|  1|  0|
    +---+---+---+---+
    

    你也可以像这样使用casewhenotherwise

    PYTHON

    >>> df.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  0|
    |  a|  L|  0|  0|
    |  a|  L|  1|  0|
    +---+---+---+---+
    
    >>> new_column = when(
            (col("B") == "T") & (col("C") > 20), 1
        ).when((col("B") == "L") & (col("C") == 0), 1).otherwise(0)
    
    >>> res = df.withColumn("D", new_column)
    >>> res.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  1|
    |  a|  L|  0|  1|
    |  a|  L|  1|  0|
    +---+---+---+---+
    

    SCALA

    scala> df.show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  0|
    |  a|  L|  0|  0|
    |  a|  L|  1|  0|
    +---+---+---+---+
    
    scala> val new_column = when(
         |     col("B") === "T" && col("C") > 20, 1
         | ).when(col("B") === "L" && col("C") === 0, 1 ).otherwise(0)
    
    new_column: org.apache.spark.sql.Column = CASE WHEN ((B = T) AND (C > 20)) THEN 1 WHEN ((B = L) AND (C = 0)) THEN 1 ELSE 0 END
    
    scala> df.withColumn("D", new_column).show()
    +---+---+---+---+
    |  A|  B|  C|  D|
    +---+---+---+---+
    |  a|  T| 10|  0|
    |  a|  T|100|  1|
    |  a|  L|  0|  1|
    |  a|  L|  1|  0|
    +---+---+---+---+
    

    【讨论】:

      猜你喜欢
      • 2020-12-07
      • 2021-05-31
      • 2013-09-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多