【问题标题】:Joining multiple Spark Dataframes based on conditions根据条件加入多个 Spark Dataframe
【发布时间】:2017-09-30 21:18:56
【问题描述】:

基于“SC”代码,我需要将 SRCTable 与 RefTable-1 或 RefTable-2 一起加入

条件: 如果 SC 为 "D" ,则 SRCTable 在 KEY = KEY1 上与 RefTable-1 连接,以获取值。 Else IF SC is "U" , SRCTable join 与 RefTable-2 on KEY = KEY2 & FK = KEY3 ,获取值。

这是输入火花数据帧。

SRCTable:
    -------------
    KEY |SC  |FK 
    -------------
    66  |D   | a
    67  |U   | b
    70  |D   | y
    71  |U   | q
    -------------
 RefTable-1:
    --------------
    KEY1 |Value  | 
    --------------
    66   |xyz1   | 
    67   |abc1   | 
    68   |fgr1   |
    69   |yte1   |
    70   |erx1   |
    71   |ter1   |
    --------------
 RefTable-2:
    --------------------
    KEY2 |KEY3  |Value  | 
    --------------------
    66   | a    |xyz2   | 
    67   | c    |abc2   | 
    67   | b    |fgr2   |
    69   | g    |yte2   |
    70   | y    |erx2   |
    71   | q    |ter2   |
    --------------------

预期输出:

    --------------------
    KEY |SC  |FK |Value |
    -------------------- 
    66  |D   | a |xyz1  |
    67  |U   | b |fgr2  |
    70  |D   | y |erx1  |
    71  |U   | q |ter2  |
    ---------------------

注意:输入表将有数百万条记录,因此需要优化解决方案

【问题讨论】:

  • 很难说没有更多细节,有多少百万行?所有的桌子都一样大吗?你试过什么?最后,如果您坚持使用数据框,则必须进行两次联接,或者一次联接和一次联合,以使另外两个 dfs 标准化。如果其中一张桌子比另一张小,也许你可以广播加入,但你提供的信息很难说
  • SRCTable 将有大约 100 万条记录,而 RefTable-1、RefTable-2 将有大约 1000 条记录。请帮忙解决
  • 那是超级小,如果你想坚持使用 spark 来做,那么srctable.join(broadcast(reftable1), join condition, 'left').join(broadcast(reftable2), join condition, 'left')。如果您希望某些记录不匹配,您可能希望使用 where 子句进行过滤。同样,您最好发布您的尝试并解释它的问题所在。

标签: scala apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

这是一个代码,您可以仅使用 DataFrame 上的连接函数进行测试

val SRCTable = Seq((66, "D", "a"), (67, "U", "b"), (70, "D", "y"), (71, "U", "q")).toDF("KEY", "SC", "FK")
val RefTable1 = Seq((66, "xyz1"),(67, "abc1"),(68, "fgr1"),(69, "yte1"),(70, "erx1"),(71, "ter1")).toDF("KEY1", "Value")
val RefTable2 = Seq((66, "a", "xyz2"), (67, "c", "abc2"), (67, "b", "fgr2"), (69, "g", "yte2"), (70, "y", "erx2"), (71, "q", "ter2")).toDF("KEY2", "KEY3", "Value")

val join1 = SRCTable.where(SRCTable.col("SC").equalTo("D")).join(RefTable1, SRCTable.col("KEY") === RefTable1.col("KEY1")).select("KEY", "SC", "FK", "Value")
val join2 = SRCTable.where(SRCTable.col("SC").equalTo("U")).join(RefTable2, SRCTable.col("KEY") === RefTable2.col("KEY2") && SRCTable.col("FK") === RefTable2.col("KEY3") ).select("KEY", "SC", "FK", "Value")

join1.unionAll(join2).show 

如果你有任何性能问题,我建议你看看如何很好地划分你的数据,如果你的 DataFrame 很小,还看看 Broadcast 对象

【讨论】:

    猜你喜欢
    • 2017-06-17
    • 2020-10-28
    • 2018-05-14
    • 1970-01-01
    • 1970-01-01
    • 2017-02-14
    • 2014-08-12
    • 1970-01-01
    • 2023-03-16
    相关资源
    最近更新 更多