【问题标题】:Pyspark: How to filter on list of two column value pairs?Pyspark:如何过滤两列值对的列表?
【发布时间】:2021-02-23 21:59:53
【问题描述】:

所以我有一个 PySpark 数据框,我想用两列的有效 的(长)列表进行过滤。

假设我们的数据框的名称是 df 和列 col1col2

col1   col2
1      A
2      B
3      1
null   2
A      null
2      null
1      null
B      C

我的有效配对列表为:flist=[(1,A), (null,2), (1,null)]

当我尝试使用 .isin() 函数(如下所示)时,它告诉我 .isin() 不适用于元组。

df.filter((df["col1"],df["col2"]).isin(flist))

通过连接两个字符串或为每对写下一个布尔表达式来解决此问题,但我有一长串有效对(很难变成布尔值)并且连接也不可靠,因为空值.使用 Python (df['col1'],df['col2']) in flist 也不起作用。

有没有 Pythonic/PySparkic 方式来做到这一点?

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql filtering


    【解决方案1】:

    基于@blackbishop 从过滤条件创建数据框并加入的方法,您可以使用Column.eqNullSafe 方法安全地比较空值:

    df = spark.createDataFrame(
        [('1', 'A', 1),
         ('2', 'B', 2),
         ('3', '1', 3),
         (None, '2', 4),
         ('A', None, 5),
         ('2', None, 6),
         ('1', None, 7),
         ('B', 'C', 8)], schema=['col1', 'col2', 'col3'])
    
    flist = [("1", "A"), (None, "2"), ("1", None)]
    filter_df = spark.createDataFrame(flist, ["col1", "col2"])
    
    (df.join(filter_df,
             df["col1"].eqNullSafe(filter_df["col1"]) &
             df["col2"].eqNullSafe(filter_df['col2']))
     .select(df['col1'], df['col2'], df['col3'])
     .show())
    

    给予:

    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|null|   7|
    |null|   2|   4|
    |   1|   A|   1|
    +----+----+----+
    

    请注意,连接仅充当过滤器,提供您的“过滤器”数据框包含唯一行。您可以在加入之前在该 Dataframe 上添加 distinct 以确保(例如,如果您的过滤条件很大)。

    【讨论】:

      【解决方案2】:

      您可以使用列表创建filder_df 并进行连接:

      flist = [("1", "A"), (None, "2"), ("1", None)]
      filter_df = spark.createDataFrame(flist, ["col1", "col2"])
      
      df1 = df.join(filter_df, ["col1", "col2"])
      
      df1.show()
      #+----+----+
      #|col1|col2|
      #+----+----+
      #|   1|   A|
      #+----+----+
      

      请注意,您不能比较空值。所以这里只返回元组("1", "A") 的行。要检查空值,您需要在列上使用isNull()

      df1 = df.alias("df").join(
          filter_df.alias("fdf"),
          ((F.col("df.col1") == F.col("fdf.col1")) |
           (col("df.col1").isNull() & F.col("fdf.col1").isNull())
           ) &
          ((F.col("df.col2") == F.col("fdf.col2")) |
           (col("df.col2").isNull() & F.col("fdf.col2").isNull())
           )
      ).select("df.*")
      
      df1.show()
      
      #+----+----+
      #|col1|col2|
      #+----+----+
      #|   1|   A|
      #|null|   2|
      #|   1|null|
      #+----+----+
      

      或者按照@Chris 的回答中的建议更好地使用eqNullSafe

      【讨论】:

        【解决方案3】:

        这是一种无需加入的方法,您可以在过滤器中链接一堆条件,以便将每一行与flist 中的值进行比较。它可以处理空值。

        from functools import reduce
        import pyspark.sql.functions as F
        
        flist = [(1, 'A'), (None, 2), (1, None)] 
        
        df2 = df.filter(
            reduce(
                lambda x, y: x | y, 
                [ 
                    ((F.col('col1') == col1) if col1 is not None else F.col('col1').isNull()) & 
                    ((F.col('col2') == col2) if col2 is not None else F.col('col2').isNull())
                    for (col1, col2) in flist
                ]
            )
        )
        
        df2.show()
        +----+----+
        |col1|col2|
        +----+----+
        |   1|   A|
        |null|   2|
        |   1|null|
        +----+----+
        

        【讨论】:

          猜你喜欢
          • 2018-03-24
          • 1970-01-01
          • 1970-01-01
          • 2020-07-08
          • 1970-01-01
          • 2017-04-25
          • 1970-01-01
          • 2022-11-02
          • 1970-01-01
          相关资源
          最近更新 更多