【问题标题】:pyspark, get rows where first column value equals id and second column value is between two values, do this for each row in a dataframepyspark,获取第一列值等于 id 并且第二列值介于两个值之间的行,对数据框中的每一行执行此操作
【发布时间】:2026-01-28 10:20:06
【问题描述】:

所以我有一个像这样的 pyspark 数据框,我们称它为数据框 a:

    +-------------------+---------------+----------------+
    |                reg|           val1|           val2 |
    +-------------------+---------------+----------------+
    |             N110WA|     1590030660|   1590038340000|
    |             N876LF|     1590037200|   1590038880000|
    |             N135MH|     1590039060|   1590040080000|

还有一个像这样,我们称它为数据框 b:

    +-----+-------------+-----+-----+---------+----------+---+----+
    |  reg|      postime|  alt| galt|      lat|      long|spd| vsi|
    +-----+-------------+-----+-----+---------+----------+---+----+
    |XY679|1590070078549|   50|  130|18.567169|-69.986343|132|1152|
    |HI949|1590070091707|  375|  455|  18.5594|-69.987804|148|1344|
    |JX784|1590070110666|  825|  905|18.544968|-69.990414|170|1216|

有没有办法创建一个 numpy 数组或 pyspark 数据帧,其中对于数据帧 a 中的每一行,数据帧 b 中的所有行在 val 1 和 val 2 之间具有相同的 reg 和 posttime?

【问题讨论】:

  • 如果您正在寻找类似的东西,请检查一下并告诉我吗?

标签: pyspark apache-spark-sql amazon-emr


【解决方案1】:

您可以尝试以下解决方案 - 让我们知道是否可行或其他任何预期?

为了展示有效的解决方案,我对估算进行了一些修改--

在此输入

from pyspark.sql import functions as F
df_a = spark.createDataFrame([('N110WA',1590030660,1590038340000), ('N110WA',1590070078549,1590070078559)],[ "reg","val1","val2"])
df_b = spark.createDataFrame([('N110WA',1590070078549)],[ "reg","postime"])
df_a.show() 

df_a

+------+-------------+-------------+
|   reg|         val1|         val2|
+------+-------------+-------------+
|N110WA|   1590030660|1590038340000|
|N110WA|1590070078549|1590070078559|
+------+-------------+-------------+

df_b

+------+-------------+
|   reg|      postime|
+------+-------------+
|N110WA|1590070078549|
+------+-------------+

解决方案在这里

from pyspark.sql import types as T
from pyspark.sql import functions as F
#df_a = df_a.join(df_b,'reg','left')
df_a = df_a.withColumn('condition_col', F.when(((F.col('postime') >= F.col('val1')) & (F.col('postime') <= F.col('val2'))),'1').otherwise('0'))
df_a = df_a.filter(F.col('condition_col') == 1).drop('condition_col')
df_a.show()

最终输出

+------+-------------+-------------+-------------+
|   reg|         val1|         val2|      postime|
+------+-------------+-------------+-------------+
|N110WA|1590070078549|1590070078559|1590070078549|
+------+-------------+-------------+-------------+

【讨论】:

    【解决方案2】:

    是的,假设 df_adf_b 都是 pyspark 数据帧,您可以在 pyspark 中使用内连接:

    delta = val
    df = df_a.join(df_b, [
        df_a.res == df_b.res,
        df_a.posttime <= df_b.val1 + delta,
        df_a.posttime >= df_b.val2 - delta
    ], "inner")
    

    将过滤掉结果以仅包含指定的结果

    【讨论】:

    • delta的意义是什么?
    最近更新 更多