【问题标题】:Joining tables with duplicate column names in spark在火花中加入具有重复列名的表
【发布时间】:2017-04-03 23:13:50
【问题描述】:

我正在尝试在 Spark 上加入多个 MySQL 表。其中一些表具有重复的列名(每个表都有一个特定于该表的 id 字段)。

如果我尝试运行:

val myDF = session.read.jdbc("t1 inner join t2 on t1.t2_id = t2.id, queryTable, prop)
myDF.show

我得到java.sql.SQLIntegrityConstraintViolationException: Column 'id' in field list is ambiguous,因为两个表都有一个 id 字段(具有不同的含义)

我试过了:

val t1DF = spark.read.jdbc(dbstring, "t1", "id").alias("a")
val t2DF = spark.read.jdbc(dbstring, "t2", "id").alias("b")
val joinedDF = t1DF.join(t2DF, Seq("a.t2_id", "b.id"))
  .selectExpr("ent.id as entity_id", "lnk.pagerank")

我收到了错误org.apache.spark.sql.AnalysisException: using columns ['t1.t2_id,'t2.id] can not be resolved given input columns: [..] 看起来分析器不知道如何处理别名。

似乎唯一可行的选项是使用子查询:

spark.read.jdbc(dbstring, "(select t1.id as t1_id, t1.t2_id from 
t1 inner join t2 on t1.t2_id = t2.id) t", "t2_id")

虽然在这种情况下,子查询需要在我执行任何过滤器之前完成运行,这会使事情变得慢得令人无法接受,并且任何查询分区都无用。

Spark 似乎确实有一些内部方法可以消除 id 的 id#528id#570 之间的歧义,但我想不出在 select 语句中引用它们的任何方式。

【问题讨论】:

    标签: mysql scala apache-spark apache-spark-sql


    【解决方案1】:

    我遇到了同样的问题。我发现解决这个问题的唯一方法是在列名上添加一个后缀。它看起来像这样:

    val t1DF = spark.read.jdbc(dbstring, "t1", "id").select(col("id").alias("id_t1"))
    val t2DF = spark.read.jdbc(dbstring, "t2", "id").select(col("id").alias("id_t2"))
    
    val joinedDF = t1DF.join(t2DF, t1DF("id_t1") === t2DF("id_t2"))
    

    【讨论】:

      猜你喜欢
      • 2019-03-03
      • 1970-01-01
      • 1970-01-01
      • 2019-01-02
      • 2017-11-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多