【问题标题】:Dataframe self join condition check数据框自连接条件检查
【发布时间】:2020-06-15 14:31:05
【问题描述】:
df1 = spark.createDataFrame([(1,[4,2]),(4,[3,2])], [ "col2","col4"])

     +----+------+
     |col2|  col4|
     +----+------+
     |  1 |[4, 2]|
     |   4|[3, 2]|
     +----+------+



   df = spark.createDataFrame([("a",1,10), ("a",2,20), ("a",3,30), 
   ("b",4,40),("b",5,40),("b",1,40)], ["col1", "col2", "col3"])

   +----+----+----+
   |col1|col2|col3|
   +----+----+----+
   |   a|   1|  10|
   |   a|   2|  20|
   |   a|   3|  30|
   |   b|   4|  40|
   |   b|   5|  40|
   |   b|   1|  40|
    +----+----+----+

根据 col2 加入 df 和 df1,如果匹配,则检查 col4 isin col2 group by col1。 我期待输出,有人可以告诉我如何自行加入 pyspark(检查 col4 isin col2 group by col1)。

预期输出


  col1   col2   col3

   a      1     10

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:
          val df1 = Seq((1,List(4,2)),(4,List(3,2))).toDF("col2","col4")
      val df = Seq(("a",1,10), ("a",2,20), ("a",3,30),
        ("b",4,40),("b",5,40),("b",1,40)).toDF("col1", "col2", "col3")
    
    
      val res1DF = df1.join(df, df1.col("col2") === df.col("col2"), "inner")
        .select(
          df.col("col1"),
          df.col("col2"),
          df.col("col3")
        )
    
      res1DF.show(false)
      //  +----+----+----+
      //  |col1|col2|col3|
      //  +----+----+----+
      //  |a   |1   |10  |
      //  |b   |4   |40  |
      //  |b   |1   |40  |
      //  +----+----+----+
    
      val df11 = df1.withColumn("col41", explode(col("col4")))
    
      val res2DF = res1DF.join(df11, df11.col("col41") === res1DF.col("col2"), "inner")
        .select(
          res1DF.col("col1"),
          res1DF.col("col2"),
          res1DF.col("col3")
        )
      res2DF.show(false)
      //  +----+----+----+
      //  |col1|col2|col3|
      //  +----+----+----+
      //  |b   |4   |40  |
      //  +----+----+----+
    

    【讨论】:

      【解决方案2】:

      这里需要使用array_contains,它根据匹配条件返回True或False

      from pyspark.sql import functions as F
      
      df = df.join(df1, "col2", "left")
      
      df = df.withColumn("is_available", (F.expr('array_contains(col4, col2)')))
      df = df.filter(F.col("is_available") == True) # In case you need only matched cases
      df.show()
      +----+----+----+---------+------------+
      |col2|col1|col3|     col4|is_available|
      +----+----+----+---------+------------+
      |   1|   a|  10|[4, 2, 1]|        true|
      |   1|   b|  40|[4, 2, 1]|        true|
      +----+----+----+---------+------------+
      

      ------对您问题的观察---------

      根据给定的数据 - 一旦我们执行连接,它就不会给出预期的结果 -

      from pyspark.sql import functions as F
      
      df = df.join(df1, "col2", "left")
      df.show()
      +----+----+----+------+
      |col2|col1|col3|  col4|
      +----+----+----+------+
      |   5|   b|  40|  null|
      |   1|   a|  10|[4, 2]|
      |   1|   b|  40|[4, 2]|
      |   3|   a|  30|  null|
      |   2|   a|  20|  null|
      |   4|   b|  40|[3, 2]|
      +----+----+----+------+
      

      现在,如果您查看 col2 和 col4 中的值,您会发现 post join 中没有可用的 1 在 [4, 2] 中。因此,在创建时我已经额外通过了 1 个

      df1 = spark.createDataFrame([(1,[4,2, **1**]),(4,[3,2])], [ "col2","col4"])
      

      【讨论】:

      • rose1110 - 如果可以提供解决方案,请您批准答案。提前谢谢您。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-01
      • 1970-01-01
      • 2021-08-17
      • 1970-01-01
      相关资源
      最近更新 更多