【问题标题】:Pyspark DataFrame column based on another DataFrame value基于另一个 DataFrame 值的 Pyspark DataFrame 列
【发布时间】:2020-10-16 21:45:35
【问题描述】:

我有两个 DataFrame:

df1= 
+---+----------+
| id|filter    |
+---+----------+
|  1|       YES|
|  2|        NO|
|  3|        NO|
+---+----------+

df2 = 
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|                   1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|XXXXXX              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|YYYYYY              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

我想做的是在 df1 中创建一个新列,根据 df1 的行值过滤 df2 中的字段名称。我的输出是这样的:

df3 =
+---+----------+----------------+
| id|filter    | value          |
+---+----------+----------------+
|  1|       YES|[XXXXXX, YYYYYY]|
|  2|        NO|        []      |
|  3|        NO|        []      |
+---+----------+----------------+

我知道如何使用 Pandas,但我不知道如何使用 PySpark。

我尝试了以下方法,但似乎不起作用:

df3 = df1.withColumn('value', f.when(df1['filter'] == 'YES', df2.select(f.col('id')).collect()).otherwise(f.lit([]))

非常感谢

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    加载提供的测试数据

      val data1 =
          """
            | id|filter
            |  1|       YES
            |  2|        NO
            |  3|        NO
          """.stripMargin
        val stringDS1 = data1.split(System.lineSeparator())
          .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
          .toSeq.toDS()
        val df1 = spark.read
          .option("sep", ";")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("nullValue", "null")
          .csv(stringDS1)
        df1.printSchema()
        df1.show(false)
        /**
          * root
          * |-- id: integer (nullable = true)
          * |-- filter: string (nullable = true)
          *
          * +---+------+
          * |id |filter|
          * +---+------+
          * |1  |YES   |
          * |2  |NO    |
          * |3  |NO    |
          * +---+------+
          */
    
        val data2 =
          """
            |                   1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15
            |XXXXXX              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
            |YYYYYY              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
          """.stripMargin
        val stringDS2 = data2.split(System.lineSeparator())
          .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
          .toSeq.toDS()
        val df2 = spark.read
          .option("sep", ";")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("nullValue", "null")
          .csv(stringDS2)
        df2.printSchema()
        df2.show(false)
        /**
          * root
          * |-- 1: string (nullable = true)
          * |-- 2: double (nullable = true)
          * |-- 3: double (nullable = true)
          * |-- 4: double (nullable = true)
          * |-- 5: double (nullable = true)
          * |-- 6: double (nullable = true)
          * |-- 7: double (nullable = true)
          * |-- 8: double (nullable = true)
          * |-- 9: double (nullable = true)
          * |-- 10: double (nullable = true)
          * |-- 11: double (nullable = true)
          * |-- 12: double (nullable = true)
          * |-- 13: double (nullable = true)
          * |-- 14: double (nullable = true)
          * |-- 15: double (nullable = true)
          *
          * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
          * |1     |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |
          * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
          * |XXXXXX|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
          * |YYYYYY|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
          * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
          */
    

    融化/取消旋转数据框 2,然后加入

    
        val stringCol = df2.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    
        val processedDF = df2.selectExpr(s"stack(${df2.columns.length}, $stringCol) as (id, value)")
        processedDF.show(false)
    
        /**
          * +---+------+
          * |id |value |
          * +---+------+
          * |1  |XXXXXX|
          * |2  |NaN   |
          * |3  |NaN   |
          * |4  |NaN   |
          * |5  |NaN   |
          * |6  |NaN   |
          * |7  |NaN   |
          * |8  |NaN   |
          * |9  |NaN   |
          * |10 |NaN   |
          * |11 |NaN   |
          * |12 |NaN   |
          * |13 |NaN   |
          * |14 |NaN   |
          * |15 |NaN   |
          * |1  |YYYYYY|
          * |2  |NaN   |
          * |3  |NaN   |
          * |4  |NaN   |
          * |5  |NaN   |
          * +---+------+
          * only showing top 20 rows
          */
    
        df1.join(processedDF, "id")
          .groupBy("id", "filter")
          .agg(collect_list("value").as("value"))
          .selectExpr("id", "filter", "FILTER(value, x -> x != 'NaN') as value")
          .show(false)
    
        /**
          * +---+------+----------------+
          * |id |filter|value           |
          * +---+------+----------------+
          * |2  |NO    |[]              |
          * |1  |YES   |[XXXXXX, YYYYYY]|
          * |3  |NO    |[]              |
          * +---+------+----------------+
          */
    

    【讨论】:

    • 非常感谢!你知道如何在 Python 中应用它吗? (熔化/非旋转部分)
    • 从这个 ans 中获取帮助以将其转换为 python - stackoverflow.com/a/62574110/4758823
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-05-20
    • 1970-01-01
    • 2020-05-07
    • 1970-01-01
    • 1970-01-01
    • 2016-03-22
    • 1970-01-01
    相关资源
    最近更新 更多