【问题标题】:How to perform inner join between two dataframes based upon difference between two columns of two different data frames如何根据两个不同数据帧的两列之间的差异在两个数据帧之间执行内连接
【发布时间】:2019-12-06 08:16:31
【问题描述】:

我想根据时间段和纬度坐标合并两个数据帧。

我最初使用窗口函数执行了一个外部产品来构建两个数据帧之间的距离。然而,这造成了巨大的数据爆炸,并且每当我尝试运行它时都会关闭我的集群(如果需要,我可以包含此代码)。作为回应,我决定执行一系列内部连接以避免这种外部产品。简而言之,我加入了差异的绝对值等于某个特定值,直到可以使用朴素窗口方法合并剩余的不匹配坐标。我环顾四周,但未在堆栈中找到任何明确处理此问题的 Pyspark 代码,因此不胜感激。

# starting with exact
conditions = [df1.lon1 == df2.lon2,
                  df1.lat1 == df2.lat2,
                  df2.start <= df1.time,
                  df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')

....
# then 1 off 
conditions = [F.abs(df1.lon1 - df2.lon2) == 1,
                  df1.lat1 == df2.lat2,
                  df2.start <= df1.time,
                  df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')
...
conditions = [df1.lon1 == df2.lon2,
              F.abs(df1.lat1 - df2.lat2==1,
                  df2.start <= df1.time,
                  df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')
...
# then both ect.
conditions = [F.abs(df1.lon1 - df2.lon2) == 1,
              F.abs(df1.lat1 - df2.lat2==1,
                  df2.start <= df1.time,
                  df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')

这不会产生预期的结果。例如,运行以下命令:

df0 = spark.createDataFrame([
    ('id1', 10.1, 12.1),
    ('id5', 13.1, 13.1)], ["id0", "lat0", "lon0"])
df1 = spark.createDataFrame([
    ('id1', 10.1, 12.3),
    ('id5', 13.1, 13.2)], ["id1", "lat1", "lon1"])
# this produces nothing 
df0.join(df1, F.abs(df1.lon1 - df0.lon0) == 0.1, 'inner').show()
# and so does this
df0.join(df1, F.round(df1.lon1 - df0.lon0, 1) == 0.1).show()```

which produces nothing. Please advise, and thanks in advance!

【问题讨论】:

    标签: apache-spark pyspark inner-join equality


    【解决方案1】:

    所以这个问题背后的推理实际上很薄弱。事实证明,您可以在连接后在 withColumn 操作中应用基本运算符。但是,在我提出问题的上下文中,我在以下代码中找到了一个更好的解决方案,它不会为我的应用程序生成那么大的中间数据帧:

    df0 = spark.createDataFrame([
        ('id1', "2015-04-27 00:00:00", "2015-04-28 00:00:00", 10.1, 12.3, 10, 12),
        ('id1', "2015-04-29 00:00:00", "2015-04-30 00:00:00", 10.1, 12.1, 10, 12),
        ('id5', "2015-04-28 00:00:00", "2015-04-29 00:00:00", 13.1, 13.4, 13, 13),
        ('id5', "2015-04-28 00:00:00", "2015-04-29 00:00:00", 13.1, 13.1, 13, 13)], ["id0", "start", "end", "lat0", "lon0", "rlat0", "rlon0",])
    df1 = spark.createDataFrame([
        ('id2', "2015-04-29 00:00:00", 10.1, 12.3, 10, 12),
        ('id2', "2015-04-28 00:00:00", 10.1, 12.5, 10, 12),
        ('id3', "2015-04-28 00:00:00", 13.1, 13.2, 13, 13)], ["id1", "date", "lat1", "lon1", "rlat1", "rlon1"])
    
    from pyspark.sql import functions as F
    # first join on the rounded value  
    joindf = df1.join(df0, [df0.rlat0 == df1.rlat1, df0.rlon0 == df1.rlon1, df0.start <= df1.date,df0.end >= df1.date])
    joindf.show()
    # compute distance 
    joindf = joindf.withColumn("dist", F.abs(joindf.lat1 - joindf.lat0)+F.abs(joindf.lon1 - joindf.lon0))
    joindf.show()
    # order the distance 
    joindf = joindf.withColumn("rank", F.row_number().over(Window.partitionBy("rlat0", "rlon0","rlat1", "rlon1").orderBy("dist")))
    joindf.show()
    joindf.filter(F.col("rank") == 1).show()
    

    我通过舍入而不是在感兴趣的数据帧上获取子集来生成中间连接。

    【讨论】:

      猜你喜欢
      • 2020-02-11
      • 1970-01-01
      • 2018-07-16
      • 2014-01-04
      • 1970-01-01
      • 2021-03-20
      • 1970-01-01
      • 1970-01-01
      • 2021-09-05
      相关资源
      最近更新 更多