【问题标题】:Stream writes having multiple identical keys to delta lake流写入具有多个相同键的 delta Lake
【发布时间】:2020-06-19 01:32:09
【问题描述】:

我正在通过 Spark 结构化流向 delta Lake 写入流。每个流式批处理都包含键值(还包含时间戳作为一列)。 delta Lake 不支持在源(蒸汽批处理)处使用多个相同的键进行更新,所以我想只用最新时间戳的记录来更新 delta Lake。我该怎么做?

这是我正在尝试的代码 sn-p:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  println(s"Executing batch $batchId ...")
  microBatchOutputDF.show()

  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

提前致谢。

【问题讨论】:

    标签: apache-spark spark-streaming delta-lake


    【解决方案1】:

    您可以从“microBatchOutputDF”数据框中消除具有较旧时间戳的记录,并仅保留给定键的具有最新时间戳的记录。

    您可以使用 spark 的 'reduceByKey' 操作并实现自定义 reduce 功能,如下所示。

    def getLatestEvents(input: DataFrame) : RDD[Row] = {
    input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }
    
    
    def reduceFun(x: Row, y: Row) : Row = {
    if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }
    

    假定键是字符串类型和时间戳类型的时间戳。并为您的流式批处理“microBatchOutputDF”调用“getLatestEvents”。它会忽略较旧的时间戳事件并仅保留最新的事件。

    val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)
    

    然后在 'latestRecordsDF' 之上调用 deltalake 合并操作

    【讨论】:

      【解决方案2】:

      在微批处理的流式传输中,您可能会针对给定键获得多个记录。为了用目标表更新它,您必须找出微批处理中键的最新记录。在您的情况下,您可以使用时间戳列的最大值和值列来查找最新记录并将其用于合并操作。

      您可以参考此link,了解有关查找给定键的最新记录的更多详细信息。

      【讨论】:

      • 将使用相同的方法。感谢您的回复。
      猜你喜欢
      • 1970-01-01
      • 2021-10-16
      • 2021-12-22
      • 1970-01-01
      • 2023-03-03
      • 2016-01-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多