【问题标题】:PySpark: Compare columns of one df with the rows of a second dfPySpark:将一个df的列与第二个df的行进行比较
【发布时间】:2019-11-27 03:32:09
【问题描述】:

我想比较两个 PySpark 数据帧。

我有具有数百列(Col1、Col2、...、Col800)的 Df1 和具有数百个相应行的 Df2。

Df2 描述了 Df1 中 800 列中每一列的限制值,如果值太低或太高,那么我想在 Final_Df 中实现结果,我在其中创建一个列 Problem 来检查是否任何列都超出限制。

我想过用 pivot 转置 Df2,但它需要一个聚合函数,所以我不确定它是否是一个相关的解决方案。

我也不知道如何加入两个 Df 进行比较,因为它们不共享任何公共列。

Df1:

| X         | Y         | Col1 | Col2 | Col3 |
+-----------+-----------+------+------+------+
| Value_X_1 | Value_Y_1 | 5000 | 250  | 500  |
+-----------+-----------+------+------+------+
| Value_X_2 | Value_Y_2 | 1000 | 30   | 300  |
+-----------+-----------+------+------+------+
| Value_X_3 | Value_Y_3 | 0    | 100  | 100  |
+-----------+-----------+------+------+------+

Df2:

+------+------+-----+
| name | max  | min |
+------+------+-----+
| Col1 | 2500 | 0   |
+------+------+-----+
| Col2 | 120  | 0   |
+------+------+-----+
| Col3 | 400  | 0   |
+------+------+-----+

Final_Df(比较后):

+-----------+-----------+------+------+------+---------+
| X         | Y         | Col1 | Col2 | Col3 | Problem |
+-----------+-----------+------+------+------+---------+
| Value_X_1 | Value_Y_1 | 5000 | 250  | 500  | Yes     |
+-----------+-----------+------+------+------+---------+
| Value_X_2 | Value_Y_2 | 1000 | 30   | 300  | No      |
+-----------+-----------+------+------+------+---------+
| Value_X_3 | Value_Y_3 | 0    | 100  | 100  | No      |
+-----------+-----------+------+------+------+---------+

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    如果df2不是大数据框,可以将其转为字典,然后使用列表推导和when函数查看状态,例如:

    from pyspark.sql import functions as F
    
    >>> df1.show()
    +---------+---------+----+----+----+
    |        X|        Y|Col1|Col2|Col3|
    +---------+---------+----+----+----+
    |Value_X_1|Value_Y_1|5000| 250| 500|
    |Value_X_2|Value_Y_2|1000|  30| 300|
    |Value_X_3|Value_Y_3|   0| 100| 100|
    +---------+---------+----+----+----+
    
    >>> df2.show()
    +----+----+---+
    |name| max|min|
    +----+----+---+
    |Col1|2500|  0|
    |Col2| 120|  0|
    |Col3| 400|  0|
    +----+----+---+
    
    # concerned columns
    cols = df1.columns[2:]
    >>> cols
    ['Col1', 'Col2', 'Col3']
    

    注意:我假设 df1 和 df2.min、df2.max 中上述列的数据类型已经设置为整数。

    从 df2 创建地图:

    map1 = { r.name:[r.min, r.max] for r in df2.collect() }
    
    >>> map1
    {u'Col1': [0, 2500], u'Col2': [0, 120], u'Col3': [0, 400]}
    

    基于两个 when() 函数添加新字段“问题”,使用列表推导遍历所有相关列

    • F.when(df1[c].between(min, max), 0).otherwise(1))
    • F.when(sum(...) > 0, 'Yes').otherwise('No')

    我们使用第一个when() 函数为每个相关列设置一个标志(0 或 1),然后对该标志求和。如果大于 0,则 Problem = 'Yes',否则为 'No':

    df_new = df1.withColumn('Problem', F.when(sum([ F.when(df1[c].between(map1[c][0], map1[c][1]), 0).otherwise(1) for c in cols ]) > 0, 'Yes').otherwise('No'))
    
    >>> df_new.show()
    +---------+---------+----+----+----+-------+
    |        X|        Y|Col1|Col2|Col3|Problem|
    +---------+---------+----+----+----+-------+
    |Value_X_1|Value_Y_1|5000| 250| 500|    Yes|
    |Value_X_2|Value_Y_2|1000|  30| 300|     No|
    |Value_X_3|Value_Y_3|   0| 100| 100|     No|
    +---------+---------+----+----+----+-------+
    

    【讨论】:

      【解决方案2】:

      使用 UDF 和字典我能够解决它。让我知道它是否有帮助。

      #  Create a map like, name -> max#min
      df = df.withColumn('name_max_min',F.create_map('name',F.concat( col('max'), lit("#"), col('min')) ))
      
      # HANDLE THE null 
      # you can try this ,not sure about this , but python has math.inf which 
      # supplies both infinities
      positiveInf = float("inf")
      negativeInf = float("-inf")
      
      df = df.fillna({ 'max':999999999, 'min':-999999999  })
      
      ### df is :
      +----+----+---+-------------------+
      |name| max|min|       name_max_min|
      +----+----+---+-------------------+
      |Col1|2500|  0|Map(Col1 -> 2500#0)|
      |Col2| 120|  0| Map(Col2 -> 120#0)|
      |Col3| 400|  0| Map(Col3 -> 400#0)|
      +----+----+---+-------------------+
      
      #  Create a dictionary out of it
      v = df.select('name_max_min').rdd.flatMap(lambda x: x).collect()
      
      keys = []
      values = []
      for p in v:
          for r, s in p.items():
              keys.append(str(r).strip())  
              values.append(str(s).strip().split('#'))
      
      max_dict = dict(zip(keys,values))
      #  max_dict = {'Col1': ['2500', '0'], 'Col2': ['120', '0'], 'Col3': ['400', '0']}
      
      #  Create a UDF which can help you to assess the conditions.
      def problem_udf(c1):
              #  GENERAL WAY 
              #  if the column names are diff
              #p =all([int(max_dict.get(r)[1]) <= int(c1[r]) <= int(max_dict.get(r)[0]) for r in c1.__fields__])
      
              p = all([ int(max_dict.get("Col" + str(r))[1]) <= int(c1["Col" + str(r)]) <= int(max_dict.get("Col" + str(r))[0])  for r in range(1, len(c1) + 1)])
              if p :
                  return("No")
              else:
                  return("Yes")
      
      
      callnewColsUdf= F.udf(problem_udf, StringType())
      
      
      col_names = ['Col'+str(i) for i in range(1,4)]
      # GENERAL WAY
      # col_names = df1.schema.names
      
      df1 = df1.withColumn('Problem', callnewColsUdf(F.struct(col_names)))
      
      
      ## Results in :
      +---------+---------+----+----+----+-------+
      |        X|        Y|Col1|Col2|Col3|Problem|
      +---------+---------+----+----+----+-------+
      |Value_X_1|Value_Y_1|5000| 250| 500|    Yes|
      |Value_X_2|Value_Y_2|1000|  30| 300|     No|
      |Value_X_3|Value_X_3|   0| 100| 100|     No|
      +---------+---------+----+----+----+-------+
      

      【讨论】:

      • 有800多列。一个通用的解决方案会有所帮助。
      • 感谢@Preetham 提供了一个非常聪明的解决方案,它按预期工作,但我仍在尝试对其进行调整,因此我不必在 question_udf 中编写 800 次检查。
      • 另外,对不起,我忘了指出 Df2 中的一些最大值或最小值为空,如果发生这种情况,则 Df1 列应仅针对非空值进行评估。目前,通过使用您的解决方案,name_max_min 列中的映射不会注册非空列,例如。 Df2:Col4,null,0。 name_max_min:[Col4->]。我曾考虑将 max 列中的空值替换为 999999999 并将 min 中的空值替换为 -999999999,但是有更强大的解决方案吗?
      • 已尽我所能完成更改!让我知道它是否有用。
      猜你喜欢
      • 2021-08-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多