【问题标题】:Including null values in an Apache Spark Join在 Apache Spark Join 中包含空值
【发布时间】:2017-06-03 09:11:34
【问题描述】:

我想在 Apache Spark 连接中包含空值。 Spark 默认不包含 null 行。

这是默认的 Spark 行为。

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))

这是joinedDf.show()的输出:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+

这是我想要的输出:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+

【问题讨论】:

    标签: sql scala apache-spark join apache-spark-sql


    【解决方案1】:

    Spark 提供了一个特殊的NULL 安全相等运算符:

    numbersDf
      .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
      .drop(lettersDf("numbers"))
    
    +-------+-------+
    |numbers|letters|
    +-------+-------+
    |    123|    abc|
    |    456|    def|
    |   null|    zzz|
    |       |    hhh|
    +-------+-------+
    

    注意不要将它与 Spark 1.5 或更早版本一起使用。在 Spark 1.6 之前,它需要笛卡尔积(SPARK-11111 - 快速空安全连接)。

    Spark 2.3.0 或更高版本中,您可以在 PySpark 中使用Column.eqNullSafe

    numbers_df = sc.parallelize([
        ("123", ), ("456", ), (None, ), ("", )
    ]).toDF(["numbers"])
    
    letters_df = sc.parallelize([
        ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
    ]).toDF(["numbers", "letters"])
    
    numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
    
    +-------+-------+-------+
    |numbers|numbers|letters|
    +-------+-------+-------+
    |    456|    456|    def|
    |   null|   null|    zzz|
    |       |       |    hhh|
    |    123|    123|    abc|
    +-------+-------+-------+
    

    SparkR 中的 %&lt;=&gt;%

    numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
    letters_df <- createDataFrame(data.frame(
      numbers = c("123", "456", NA, ""),
      letters = c("abc", "def", "zzz", "hhh")
    ))
    
    head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
    
      numbers numbers letters
    1     456     456     def
    2    <NA>    <NA>     zzz
    3                     hhh
    4     123     123     abc
    

    使用 SQL (Spark 2.2.0+),您可以使用IS NOT DISTINCT FROM

    SELECT * FROM numbers JOIN letters 
    ON numbers.numbers IS NOT DISTINCT FROM letters.numbers
    

    这也可以与DataFrame API 一起使用:

    numbersDf.alias("numbers")
      .join(lettersDf.alias("letters"))
      .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
    

    【讨论】:

    • 谢谢。 This is another good answer 使用 &lt;=&gt; 运算符。如果您正在执行多列连接,则可以使用 &amp;&amp; 运算符链接条件。
    • 根据我的经验(Amazon Glue 上的 Spark 2.2.1),SQL 语法与 Scala 相同:SELECT * FROM numbers JOIN letters ON numbers.numbers letters.numbers
    • 如果我将列列表传递给joinon 参数,有没有办法使用eqNullSafe?
    • @zero323 我有一个类似的问题,但我想用 Seq 来做。你能帮忙链接是-stackoverflow.com/questions/61128618/…
    【解决方案2】:
    val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
    val letters2 = lettersDf.withColumnRenamed("numbers","num2")
    val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull &&  $"num2".isNull) ,"outer")
    joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show  //rename the columns back to the original names
    

    【讨论】:

      【解决方案3】:

      根据KL的思路,你可以使用foldLeft来生成连接列表达式:

      def nullSafeJoin(rightDF: DataFrame, columns: Seq[String], joinType: String)(leftDF: DataFrame): DataFrame = 
      {
      
        val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
        val fullExpr = columns.tail.foldLeft(colExpr) { 
          (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p) 
        }
      
        leftDF.join(rightDF, fullExpr, joinType)
      }
      

      那么,你可以像这样调用这个函数:

      aDF.transform(nullSafejoin(bDF, columns, joinType))
      

      【讨论】:

        【解决方案4】:

        补充其他答案,对于 PYSPARK ,您不会有 Column.eqNullSafe 也没有 IS NOT DISTINCT FROM。 p>

        您仍然可以使用 sql 表达式构建 运算符以将其包含在连接中,只要您为连接查询定义别名:

        from pyspark.sql.types import StringType
        import pyspark.sql.functions as F
        
        numbers_df = spark.createDataFrame (["123","456",None,""], StringType()).toDF("numbers")
        letters_df = spark.createDataFrame ([("123", "abc"),("456", "def"),(None, "zzz"),("", "hhh") ]).\
            toDF("numbers", "letters")
        
        joined_df = numbers_df.alias("numbers").join(letters_df.alias("letters"),
                                                     F.expr('numbers.numbers <=> letters.numbers')).\
            select('letters.*')
        joined_df.show()
        
        
        +-------+-------+
        |numbers|letters|
        +-------+-------+
        |    456|    def|
        |   null|    zzz|
        |       |    hhh|
        |    123|    abc|
        +-------+-------+
        

        【讨论】:

          【解决方案5】:

          基于 timothyzhang 的想法,可以通过删除重复列来进一步改进它:

          def dropDuplicateColumns(df: DataFrame, rightDf: DataFrame, cols: Seq[String]): DataFrame 
          = cols.foldLeft(df)((df, c) => df.drop(rightDf(c)))
          
          def joinTablesWithSafeNulls(rightDF: DataFrame, leftDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = 
          {
          
          val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
          
          val fullExpr = columns.tail.foldLeft(colExpr) {
            (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p)
          }
          
          val finalDF = leftDF.join(rightDF, fullExpr, joinType)
          
          val filteredDF = dropDuplicateColumns(finalDF, rightDF, columns)
          
          filteredDF
          
          }
          

          【讨论】:

            【解决方案6】:

            尝试以下方法将空行包含到 JOIN 运算符的结果中:

            def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = {
            
                var columnsExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
            
                columns.drop(1).foreach(column => {
                    columnsExpr = columnsExpr && (leftDF(column) <=> rightDF(column))
                })
            
                var joinedDF: DataFrame = leftDF.join(rightDF, columnsExpr, joinType)
            
                columns.foreach(column => {
                    joinedDF = joinedDF.drop(leftDF(column))
                })
            
                joinedDF
            }
            

            【讨论】:

            • 这个方法有一个问题,它会在最后丢弃leftDF列,这对于右连接是错误的。我建议使用 TODO 进行编辑,我认为它会按原样工作(我现在正在使用它)。但以防万一其他人复制它,他也应该验证这一点。
            • 编辑被拒绝...天知道为什么,下面的“代码”应该在最后一个 foreach 上修复它: columns.foreach(column => { if (joinType.contains("right" )) { 加入DF = 加入DF.drop(leftDF(column)) } 其他{ 加入DF = 加入DF.drop(rightDF(column)) } })
            • 非常正确——或者你可以调用并颠倒顺序......所以左右切换。
            猜你喜欢
            • 2019-05-29
            • 2021-11-14
            • 1970-01-01
            • 2021-12-27
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2020-07-12
            • 1970-01-01
            相关资源
            最近更新 更多