【问题标题】:How does Spark Structured Streaming determine an event has arrived late?Spark Structured Streaming 如何确定事件迟到?
【发布时间】:2018-02-26 14:27:46
【问题描述】:

我通读了 spark 结构化流式处理文档,我想知道 spark 结构化流式处理如何确定事件迟到?它是否将事件时间与处理时间进行比较?

以上图为例,粗体右箭头线“时间”是否代表处理时间?如果是这样

1) 这个处理时间从何而来?因为它的流式传输是否假设有人可能使用其中具有处理时间戳的上游源或火花添加了处理时间戳字段?例如,当从 Kafka 读取消息时,我们会执行类似

的操作
Dataset<Row> kafkadf = spark.readStream().forma("kafka").load()

这个数据框默认有时间戳列,我假设它是处理时间。正确的?如果是,Kafka 或 Spark 是否添加此时间戳?

2) 我可以看到消息中的粗体右箭头线和时间之间存在时间比较。这就是 spark 确定事件迟到的方式吗?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    单个作业的处理时间(DStream 中的一个RDD)通常决定了处理时间。不是在实际处理该 RDD 时发生,而是在分配 RDD 作业以进行处理时。 为了清楚地理解上述语句的含义,创建一个批处理时间 = 60 seconds 的 spark 流应用程序,并确保批处理采用 2 minute。最终您会看到一个作业被分配为一次处理,但由于上一个作业尚未完成而未被拾取。

    下一步: 乱序数据有两种不同的处理方式。

    1. 创建一个High water mark

    在您获得图片的同一个 spark 用户指南页面中对此进行了说明。

    很容易理解我们在哪里有keyvalue 对,其中键是timestamp。设置.withWatermark("timestamp", "10 minutes") 本质上是说,如果我收到10 AM 的消息,那么我将允许比(Upto 9.50AM) 稍早的消息。任何比这更早的消息都会被丢弃。

    1. 处理乱序数据的另一种方法是使用mapGroupsWithStatemapWithState 函数。 在这里,您可以决定当您获得一堆键值时要做什么。在时间 X 之前放弃任何东西,或者比这更花哨。 (例如,如果是来自 A 的数据,则允许延迟 20 分钟,其余允许延迟 30 分钟等...)

    【讨论】:

    • 亲爱的投反对票的人,如果您在投反对票时说出问题所在,那就太好了 - 然后用户可以编辑和提高质量:)
    【解决方案2】:

    databricks 的这份文档解释得很清楚: https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

    基本上它归结为水印(延迟数据阈值)和数据记录到达的顺序。处理时间根本没有影响。您在代表您的活动时间的列上设置了水印。如果在已经看到具有事件时间 T2 的记录 R2 之后,具有事件时间 T1 的记录 R1 到达,并且 T2 > T1 + 阈值,则将丢弃 R1。

    例如,假设 T1 = 09:00,T2 = 10:00,阈值 = 61 分钟。如果具有事件时间 T1 的记录 R1 在具有事件时间 T2 的记录 R2 之前到达,则 R1 将包含在计算中。如果 R1 在 R2 之后到达,那么 R1 仍然包含在计算中,因为 T2

    现在假设阈值 = 59 分钟。如果 R1 在 R2 之前到达,则 R1 包含在计算中。如果 R1 在 R2 之后到达,则 R1 被丢弃,因为我们已经看到了一条事件时间为 T2 且 T2 > T1 + Threshold 的记录。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-03-19
      • 2023-03-31
      • 1970-01-01
      • 2020-09-12
      • 2020-03-07
      • 2019-08-04
      • 2021-06-03
      相关资源
      最近更新 更多