【问题标题】:Spark Structured Streaming watermark errorSpark结构化流水印错误
【发布时间】:2019-04-06 13:37:09
【问题描述】:

跟进this question

我有如下格式的json流数据

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

我需要把它转换成下面的格式

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

要实现这一点,请按照上一个问题的建议执行转换。

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

现在我尝试将结果保存到 HDFS 中的 csv 文件

df6.withWatermark("event_time", "0 seconds")
  .writeStream
  .trigger(Trigger.ProcessingTime("0 seconds"))
  .queryName("query_db")
  .format("parquet")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("path", "/path/to/output")
  //      .outputMode("complete")
  .start()

现在我收到以下错误。

线程“main”中的异常 org.apache.spark.sql.AnalysisException:当流式 DataFrames/DataSets 上没有水印的流式聚合时,不支持附加输出模式;; EventTimeWatermark event_time#223:时间戳、间隔

我的疑问是我没有执行任何聚合,这将要求它在该行的处理时间之外存储聚合值。为什么我会收到此错误?我可以将水印保持为 0 秒吗?

对此的任何帮助将不胜感激。

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    据我了解,只有在事件时间执行窗口操作时才需要加水印。 Spark 使用水印处理迟到的数据,出于同样的目的,Spark 需要保存旧的聚合。

    以下链接通过示例很好地解释了这一点: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

    我在您的转换中没有看到任何窗口操作,如果是这种情况,那么我认为您可以尝试运行不带水印的流查询。

    【讨论】:

    • 但是我得到同样的错误,如果尝试在没有水印的情况下执行
    • 你有什么解决办法吗?我面临同样的问题..我无法写入没有水印的HDFS位置..我在写入hdfs时删除了水印列..它成功了,但我想对没有水印的数据进行分组..你能提供任何方法跨度>
    【解决方案2】:

    在对 Spark 流结构进行分组时,您必须在数据帧中已有水印,并在分组时将其考虑在内,方法是在您的聚合中包含一个水印窗口

        df.groupBy(col("dummy"), window(col("event_time"), "1 day")).
    

    【讨论】:

      猜你喜欢
      • 2021-05-03
      • 1970-01-01
      • 2021-05-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-26
      • 1970-01-01
      相关资源
      最近更新 更多