【问题标题】:Pyspark DataFrame - How to use variables to make join?Pyspark DataFrame - 如何使用变量进行连接?
【发布时间】:2017-01-27 18:27:30
【问题描述】:

我在 python 上使用 Spark 数据帧连接两个数据帧时遇到了一些麻烦。我有两个数据框,我必须更改列的名称以使它们对于每个数据框都是唯一的,所以稍后我可以知道哪一列是哪一列。我这样做是为了重命名列(firstDf 和 secondDf 是使用函数 createDataFrame 创建的 Spark DataFrame):

oldColumns = firstDf.schema.names
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns))
firstDf = firstDf.toDF(*newColumns)

我对第二个 DataFrame 重复了这个。然后我尝试加入他们,使用以下代码:

from pyspark.sql.functions import *

firstColumn = 'firstDf.firstColumn'
secondColumn = 'secondDf.firstColumn'
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner')

这样使用我得到以下错误:

AnalysisException "cannot resolve 'firstDf.firstColumn' given input columns: [firstDf.firstColumn, ...];"

这只是为了说明该列存在于输入列数组中。

如果我不重命名 DataFrames 列,我可以使用这段代码加入它们:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner')

但这给了我一个列名不明确的 DataFrame。

关于如何解决这个问题的任何想法?

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    一般来说,不要在名称中使用点。这些具有特殊含义(可用于确定表或访问struct 字段)并且需要一些额外的工作才能正确识别。

    对于 equi 连接,您只需要一个列名:

    from pyspark.sql.functions import col
    
    firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))
    secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))
    
    column = 'firstColumn'
    firstDf.join(secondDf, [column], 'inner')
    
    ## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string]
    

    对于复杂的情况,使用表别名:

    firstColumn = 'firstDf.firstColumn'
    secondColumn = 'secondDf.firstColumn'
    
    firstDf.alias("firstDf").join(
        secondDf.alias("secondDf"),
        # After alias prefix resolves to table name
        col(firstColumn) == col(secondColumn),
       "inner"
    )
    
    ## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string]
    

    您也可以直接使用父框架:

    column = 'firstColumn'
    
    firstDf.join(secondDf, firstDf[column] == secondDf[column])
    

    【讨论】:

    • 感谢您的回复,特别是关于不要在名称中使用点的提示。第一种方法有效,但我需要加入的 DataFrame 为两个加入的 DataFrame 的每一列具有唯一的列名。不过,按照建议使用表别名会给我在问题中显示的相同 AnalysisException 错误。
    • 它应该可以正常工作。我为一个完全可重现的示例添加了表定义。
    • 对不起,伙计,我刚刚意识到,改变点就可以了。再次感谢您的回复!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-06-27
    • 1970-01-01
    • 1970-01-01
    • 2020-11-09
    • 1970-01-01
    • 2020-03-31
    • 2019-09-21
    相关资源
    最近更新 更多