【问题标题】:how to update a row based on another row with same id如何根据具有相同 id 的另一行更新一行
【发布时间】:2018-12-24 07:16:07
【问题描述】:

使用 Spark 数据框,我想根据具有相同 id 的其他行更新行值。

例如, 我有以下记录,

id,value
1,10
1,null
1,null
2,20
2,null
2,null

我想得到如下结果

id,value
1,10
1,10
1,10
2,20
2,20
2,20

总而言之,某些行中的 value 列为 null,如果有另一行具有相同 id 且具有有效值的行,我想更新它们。

在sql中,我可以简单地用inner-join写一个更新语句,但是我在Spark-sql中没有找到同样的方法。

更新 combineCols a 内连接 combineCols b 在 a.id = b.id 设置 a.value = b.value (这就是我在sql中的做法)

【问题讨论】:

  • 如果一个ID有多个值怎么办?例如,如果 id 1 的值不是 (10, null, null),而是 (10,30, null),那么应该发生什么?
  • 只能为一个值或为空

标签: apache-spark-sql


【解决方案1】:

让我们使用 SQL 方法来解决这个问题 -

myValues = [(1,10),(1,None),(1,None),(2,20),(2,None),(2,None)]
df = sqlContext.createDataFrame(myValues,['id','value'])

df.registerTempTable('table_view')
df1=sqlContext.sql(
    'select id, sum(value) over (partition by id) as value from table_view'
)
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   10|
|  1|   10|
|  2|   20|
|  2|   20|
|  2|   20|
+---+-----+

警告: 这些代码假定任何特定的id 都只有一个non-null 值。当我们groupby 值时,我们必须使用aggregation 函数,而我使用了sum。如果任何id 有2 个non-null 值,则将求和。如果id 可以有多个non-null 值,那么最好使用min/max,这样我们就可以得到其中一个值而不是sum

df1=sqlContext.sql(
    'select id, max(value) over (partition by id) as value from table_view'
)

【讨论】:

  • 谢谢,但实际上我在操作一个中间表,它有70多列和2000万行,用整个表的SQL来做效率高吗?
  • 哦,是的,为什么不呢。 SQL 在内置优化中使用它自己的。我一定会使用它。但是,您也可以自己进行时间测试。请注意,我的代码假定对于任何特定的id,您只有一个non-null 值,否则它将总结它们。对值进行分组时,需要提供聚合函数,我提供了sum.
  • 如果两个答案中的任何一个对您有所帮助,您可以随时投票并接受您喜欢的答案。
【解决方案2】:

您可以使用 window 来执行此操作(在 pyspark 中):

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# create dataframe
df = sc.parallelize([
    [1,10],
    [1,None],
    [1,None],
    [2,20],
    [2,None],
    [2,None],
]).toDF(('id', 'value'))

window = Window.partitionBy('id').orderBy(F.desc('value'))
df \
    .withColumn('value', F.first('value').over(window)) \
    .show()

结果:

+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   10|
|  1|   10|
|  2|   20|
|  2|   20|
|  2|   20|
+---+-----+

您可以在 scala 中使用相同的功能。

【讨论】:

  • 谢谢:) 我还通过使用 groupby val df2 = df1.filter(!isnan($"value")).groupBy("id").agg(mean("value" ).as("update_value")) val result = df1.join(df2,Seq("id"),"inner").selectExpr("id","update_value") 有没有比使用均值函数更好的方法在这种情况下?
  • 如果同一个 id 有多个值,例如 1,10 和 1,20,那么所有具有 1 id 的行的值都是 15。我从你的解释中了解到你不想要它。如果同一个 id 的所有值都为 null,您希望发生什么?
  • 如果所有值都为null,则保持为null,如果不为null,则只有一个可能的值
  • 如果所有值都为 null,则它在您的解决方案中变为 0。对于所有这些情况,您都可以使用第一个函数。
猜你喜欢
  • 2021-05-08
  • 1970-01-01
  • 1970-01-01
  • 2020-04-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-09-04
相关资源
最近更新 更多