【问题标题】:Adding a ROW_NUMBER column to a streaming dataframe将 ROW_NUMBER 列添加到流式数据帧
【发布时间】:2020-12-18 15:55:28
【问题描述】:

我对 Spark 和 SQL 还很陌生。我正在尝试向我的 df 添加一列(然后我将其保存到 Delta 表中),它为每条记录/行提供一个唯一的 id,并在每次更新特定记录时递增它。

我正在尝试执行以下操作:

SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc

somerows1 是几个列的连接,以形成唯一的记录。我对以特定形式排序的记录没有特别的兴趣,这就是我选择 ORDER BY (SELECT NULL) 的原因。

我收到以下错误:

Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

有人知道如何解决这个问题吗?

谢谢

【问题讨论】:

  • 正如this 建议的那样,问题可能是您需要在分区中指定一个基于时间的列。
  • @Let'stry 我尝试在分区中添加时间戳列,但仍然出现相同的错误

标签: sql apache-spark-sql sql-order-by spark-streaming row-number


【解决方案1】:

我已经通过在.writeStream 上使用foreachBatch 接收器解决了这个问题。这允许您创建一个函数,其中流数据帧被视为静态/批处理数据帧(该函数应用于每个微批处理)。

在 Scala 中,代码如下所示:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}

val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {

  val windowSpec = Window
    .partitionBy("somerows1") 
    .orderBy(lit(null))
  
  sourceDf
    .withColumn("versionid", row_number().over(windowSpec))

//... save the dataframe using: sourceDf.write.save()
}

使用.writeStream 调用您的函数:

  .writeStream
  .format("delta")
  .foreachBatch(saveWithWindowFunction)
  .start()

【讨论】:

    【解决方案2】:

    您正在寻找的是在滑动事件时间窗口上的聚合。查看文档和示例here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-04-25
      • 2023-03-22
      • 2018-07-25
      • 2021-07-31
      • 1970-01-01
      • 2021-12-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多