【问题标题】:Update Spark DataFrame based on values of another Spark Dataframe根据另一个 Spark Dataframe 的值更新 Spark DataFrame
【发布时间】:2019-07-12 13:45:12
【问题描述】:

我有两个数据框,df1df2 如下图所示:

df1.show()
+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   0    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+


df2.show()
+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | a   | b  |   ?    |
|  C|   ghi  | a   | c  |   ?    |
+---+--------+-----+----+--------+

如果df2 中引用了该行,我想更新df1 中的c5 列并将其设置为1。每条记录由c1c2 列标识。

以下是所需的输出;注意第一条记录的c5值更新为1

+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   1    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    将 df2 左连接到 df1 并为 c5 使用 case when .. 表达式。

    from pyspark.sql.functions import when,*
    joined_dfs = df1.join(df2,(df1.c1 == df2.c1) & (df1.c2 == df2.c2),'left').select('df1.*')
    joined_dfs.select(joined_dfs.c1,joined_dfs.c2,joined_dfs.c3,joined_dfs.c4) \
              .withColumn('c5',when((joined_dfs.c1.isNotNull()) & (joined_dfs.c2.isNotNull()),1).otherwise(0)) \ 
              .show()
    

    【讨论】:

    • 感谢您的回复。我收到一个错误:AnalysisException: "cannot resolve 'df1.*' given input columns '';"
    • 我不太明白你回答的第二部分。而且它似乎不起作用。
    猜你喜欢
    • 2018-03-16
    • 2018-01-26
    • 2017-02-14
    • 1970-01-01
    • 2016-05-25
    • 2021-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多