【发布时间】:2019-12-06 08:16:31
【问题描述】:
我想根据时间段和纬度坐标合并两个数据帧。
我最初使用窗口函数执行了一个外部产品来构建两个数据帧之间的距离。然而,这造成了巨大的数据爆炸,并且每当我尝试运行它时都会关闭我的集群(如果需要,我可以包含此代码)。作为回应,我决定执行一系列内部连接以避免这种外部产品。简而言之,我加入了差异的绝对值等于某个特定值,直到可以使用朴素窗口方法合并剩余的不匹配坐标。我环顾四周,但未在堆栈中找到任何明确处理此问题的 Pyspark 代码,因此不胜感激。
# starting with exact
conditions = [df1.lon1 == df2.lon2,
df1.lat1 == df2.lat2,
df2.start <= df1.time,
df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')
....
# then 1 off
conditions = [F.abs(df1.lon1 - df2.lon2) == 1,
df1.lat1 == df2.lat2,
df2.start <= df1.time,
df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')
...
conditions = [df1.lon1 == df2.lon2,
F.abs(df1.lat1 - df2.lat2==1,
df2.start <= df1.time,
df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')
...
# then both ect.
conditions = [F.abs(df1.lon1 - df2.lon2) == 1,
F.abs(df1.lat1 - df2.lat2==1,
df2.start <= df1.time,
df2.end >= df1.time]
current_df_hit = df1.join(df2, conditions, 'inner')
这不会产生预期的结果。例如,运行以下命令:
df0 = spark.createDataFrame([
('id1', 10.1, 12.1),
('id5', 13.1, 13.1)], ["id0", "lat0", "lon0"])
df1 = spark.createDataFrame([
('id1', 10.1, 12.3),
('id5', 13.1, 13.2)], ["id1", "lat1", "lon1"])
# this produces nothing
df0.join(df1, F.abs(df1.lon1 - df0.lon0) == 0.1, 'inner').show()
# and so does this
df0.join(df1, F.round(df1.lon1 - df0.lon0, 1) == 0.1).show()```
which produces nothing. Please advise, and thanks in advance!
【问题讨论】:
标签: apache-spark pyspark inner-join equality