【问题标题】:Dataframe join not working in spark 2.4.5数据框连接在 Spark 2.4.5 中不起作用
【发布时间】:2020-06-09 15:16:02
【问题描述】:

我正在尝试在 pyspark 中加入两个数据框,如下所示:

df1 : 
+----------+----------+--------------------+-----+
|FIRST_NAME| LAST_NAME|        COMPANY_NAME|CCODE|
+----------+----------+--------------------+-----+
|  Rebbecca|     Didio|Brandt, Jonathan ...|   AU|
|    Stevie|     Hallo|Landrum Temporary...|   US|
|    Mariko|    Stayer| Inabinet, Macre Esq|   BR|
|   Gerardo|    Woodka|Morris Downing & ...|   US|
|     Mayra|      Bena|  Buelt, David L Esq|   CN|
|    Idella|  Scotland|Artesian Ice & Co...|   UK|
|   Sherill|      Klar|        Midway Hotel|   CA|
+----------+----------+--------------------+-----+

DF2:
+--------------------+-----------+
|             COUNTRY|COUNTRYCODE|
+--------------------+-----------+
|      United Kingdom|         UK|
|       United States|         US|
|United Arab Emirates|         AE|
|              Canada|         CA|
|              Brazil|         BR|
|               India|         IN|
+--------------------+-----------+

我正在尝试在 df1.CCODE == df2.COUNTRYCODE 上加入两个数据框,但它不起作用:

df1 = df1.alias('df1')
df2 = df2.alias('df2')
tgt_tbl_col='COUNTRYCODE'
src_tbl_col='CCODE'
join_type = 'INNER'
merge_df = df1.join(df2, df2.tgt_tbl_col == df1.src_tbl_col, how=join_type)

错误:

AttributeError: 'DataFrame' object has no attribute 'tgt_tbl_col'
/databricks/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
   1332         if name not in self.columns:
   1333             raise AttributeError(
-> 1334                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
   1335         jc = self._jdf.apply(name)
   1336         return Column(jc)

但是,当我使两个列名相同并运行以下命令时,同样的工作:

merge_df = df1.join(df2, on=[tgt_tbl_col], how=join_type)

需要这方面的建议。

版本:Apache Spark 2.4.5、Scala 2.11、python 3.8

【问题讨论】:

    标签: python pyspark apache-spark-sql


    【解决方案1】:

    请注意 tgt_tbl_col 不是 df2 中的列名,因此会引发错误。

    因此您可以执行以下操作:

    from pyspark.sql.functions import col
    merge_df = df1.join(df2, col(tgt_tbl_col) == col(src_tbl_col), how=join_type)
    

    也可以直接这样写:

    merge_df = df1.join(df2, df1.CCODE == df2.COUNTRYCODE , "inner")
    

    注意:如果你没有在上面的语句中指定“inner”。没关系,spark 默认会考虑内部连接。

    在您的进一步问题中,您在写作时所说的工作如下: merge_df = df1.join(df2, on=[tgt_tbl_col], how=join_type)

    在您的情况下,您将 tgt_tbl_col(i.e.'COUNTRYCODE') 作为字符串传递,仅当您将 df1 表中的 CCODE 列重命名为 COUNTRYCODE 时它才会起作用,否则它将引发错误。

    注意:如果两个表具有相同的要加入的列名,则可以直接将列名作为字符串传递,而不是同时提及两个表的列名作为条件。

    【讨论】:

      【解决方案2】:

      您的代码失败,因为它试图将变量名作为 df1 的属性而不是“CCODE”,因此 df1.tgt_tbl_col 给出错误。试试下面的代码,它使用 col 有效的函数。如果您可以将 df1 列 CCODE 重命名为 COUNTRYCODE ,那么 @Sampath 的答案就可以了

      注意:确保在给出条件时 col1 和 col2 应该按加入顺序,即 df1.join(df2 .... 所以 col1 必须是 df1 列,col2 必须是 df2 列

      from pyspark.sql import functions as sf 
      
      col1="CCODE"
      col2="COUNTRYCODE"
      
      condition = sf.col(col1) == sf.col(col2)
      join_type = "inner"
      
      df1.join(df2, condition, how=join_type).show()
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-05-14
        • 1970-01-01
        • 1970-01-01
        • 2019-06-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-09-29
        相关资源
        最近更新 更多