【问题标题】:pyspark left outer join with multiple columnspyspark左外连接多列
【发布时间】:2017-09-24 08:34:15
【问题描述】:

我正在使用 Pyspark 2.1.0。

我正在尝试使用以下方法执行两个数据帧的左外连接: 我有 2 个数据框,其架构如下所示:

crimes
 |-- CRIME_ID: string (nullable = true)
 |-- YEAR_MTH: string (nullable = true)
 |-- CRIME_TYPE: string (nullable = true)
 |-- CURRENT_OUTCOME: string (nullable = true)

outcomes
 |-- CRIME_ID: string (nullable = true)
 |-- YEAR_MTH: string (nullable = true)
 |-- FINAL_OUTCOME: string (nullable = true)

我需要能够将犯罪与基于左外部的结果联系起来,因为单一犯罪存在许多结果。我想排除两个数据框共有的列。

我尝试了以下两种方法,但每种方法都会产生各种错误:

cr_outs = crimes.join(outcomes, crimes.CRIME_ID == outcomes.CRIME_ID, 'left_outer')\
 .select(['crimes.'+c for c in crimes.columns] + ['outcomes.FINAL_OUTCOME'])

 from pyspark.sql.functions as fn    
 cr_outs = crimes.alias('a').join(outcomes.alias('b'), fn.col('b.CRIME_ID') = fn.col('a.CRIME_ID') ,'left_outer')\
  .select([fn.col('a.'+ c) for c in a.columns] + b.FINAL_OUTCOME)

有人可以建议另一种方法吗? 谢谢

【问题讨论】:

  • 您不想在连接中包含YEAR_MTH 列吗? (连接...)
  • 一对多关系仅基于 CRIME_ID。

标签: join pyspark spark-dataframe


【解决方案1】:

您可以临时重命名常用列以消除歧义

crimes = crimes\
.withColumnRenamed('CRIME_ID','CRIME_ID_1')\
.withColumnRenamed('YEAR_MTH','YEAR_MTH_1)


required_columns = [c for c in crimes.columns] + ['FINAL_OUTCOME']

cr_outs = crimes\
.join(outcomes, crimes.CRIME_ID_1 == outcomes.CRIME_ID, 'left_outer')\
.select(required_columns)

【讨论】:

    【解决方案2】:

    这成功了,看来你必须使用别名,类似于 posted before,虽然在 PySpark 2.1.0 中稍微简单一些。

    cr_outs = crimes.alias('a')\
      .join(outcomes, crimes.CRIME_ID == outcomes.CRIME_ID, 'left_outer')\
      .select(*[col('a.'+c) for c in crimes.columns] 
              + [outcomes.FINAL_OUTCOME])
    
    cr_outs.show()
    cr_outs.printSchema()
    
    --------+-------------------+--------------------+--------------------+--------------------+
    |            CRIME_ID|YEAR_MTH|         REPORTED_BY|        FALLS_WITHIN|LONGITUDE| LATITUDE|            LOCATION|LSOA_CODE|          LSOA_NAME|          CRIME_TYPE|     CURRENT_OUTCOME|       FINAL_OUTCOME|
    +--------------------+--------+--------------------+--------------------+---------+---------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+
    |426085c2ed33af598...| 2017-01|City of London Po...|City of London Po...|-0.086051| 51.51357|On or near Finch ...|E01032739|City of London 001F|         Other theft|Investigation com...|Investigation com...|
    |33a3ddb8160a854a4...| 2017-01|City of London Po...|City of London Po...|-0.077777|51.518047|On or near Sandy'...|E01032
    ..
    ..
    ..
    root
     |-- CRIME_ID: string (nullable = true)
     |-- YEAR_MTH: string (nullable = true)
     |-- REPORTED_BY: string (nullable = true)
     |-- FALLS_WITHIN: string (nullable = true)
     |-- LONGITUDE: float (nullable = true)
     |-- LATITUDE: float (nullable = true)
     |-- LOCATION: string (nullable = true)
     |-- LSOA_CODE: string (nullable = true)
     |-- LSOA_NAME: string (nullable = true)
     |-- CRIME_TYPE: string (nullable = true)
     |-- CURRENT_OUTCOME: string (nullable = true)
     |-- FINAL_OUTCOME: string (nullable = true)
    

    如您所见,列比我原来的帖子多得多,但没有重复的列,也没有重命名列:-)

    【讨论】:

      【解决方案3】:

      您可以使用以下函数删除重复项。

      def dropDupeDfCols(df):
         newcols = []
         dupcols = []
      
      for i in range(len(df.columns)):
          if df.columns[i] not in newcols:
              newcols.append(df.columns[i])
          else:
              dupcols.append(i)
      
      df = df.toDF(*[str(i) for i in range(len(df.columns))])
      for dupcol in dupcols:
          df = df.drop(str(dupcol))
      
      return df.toDF(*newcols)## Heading ##
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-08-30
        • 2012-10-21
        • 1970-01-01
        • 2016-12-17
        • 2011-10-01
        • 2015-01-07
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多