【问题标题】:Dataflow reading from Kafka without data loss?从 Kafka 读取数据流而不会丢失数据?
【发布时间】:2021-05-25 07:10:46
【问题描述】:

我们目前是 Dataflow 批处理作业的大用户,如果可以可靠地完成,我们希望开始使用 Dataflow 流。

这是一个常见的场景:我们有一个非常大的 Kafka 主题,我们需要对其进行一些基本的 ETL 或聚合,以及一个非幂等上游队列。以下是我们的 Kafka 数据示例:

    ID |     msg | timestamp (mm,ss)
-----------------------
  1    |     A   |  01:00
  2    |     B   |  01:01
  3    |     D   |  06:00
  4    |     E   |  06:01
  4.3  |     F   |  06:01
 ....  | ......  | ...... (millions more)
  
  4.5  |    ZZ   |  19:58




糟糕,数据在某个时刻从整数变为小数,最终会导致某些元素失败,需要我们杀死管道,可能会修改下游服务,并可能对 Dataflow 管道进行少量代码更改。

在 Spark 结构化流式处理中,由于能够使用外部检查点,我们将能够重新启动流式处理作业并恢复处理前一个作业停止(成功处理)的队列,只进行一次处理。在 vanilla 或 spring boot Java 应用程序中,我们可以循环使用 Kafka 消费者,并且只有在将结果写入我们的“接收器”之后,才能提交偏移量。

我的总体问题是我们能否在 Dataflow 中实现类似的功能?我将列出我的一些假设和担忧:

  1. 似乎在KafkaIO 中,偏移提交 PCollection 和用户的之间没有关系,这是否意味着它们可以分开?
  2. 似乎在KafkaOffsetCommit 这需要 aw5 分钟的间隔 并发出最高的偏移量,但这是不是墙上时间,这是 kafka 记录时间.回到我们的示例数据,在我看来,整个队列的偏移量将尽可能快地提交(以五分钟为单位)! 这意味着如果我们只在前五分钟完成了记录 F 的处理,我们可能已经提交了几乎整个队列的 fests?

现在在我们的场景中,我们的管道在 F 附近开始失败,看来我们唯一的选择是从头开始还是丢失数据?我相信这可以通过大量自定义代码(自定义 DoFn 以确保 Kafka 消费者永远不会提交)和一些用于我们上游接收器的自定义代码来克服,这些代码最终会提交偏移量。有没有更好的方法来做到这一点,和/或我对如何在 Dataflow 中处理偏移管理的一些假设是错误的?

【问题讨论】:

    标签: google-cloud-platform apache-kafka google-cloud-dataflow apache-beam


    【解决方案1】:

    感谢您的详细问题!

    1. 在 Beam(因此是 Dataflow)中,“捆绑”的所有输出连同所有状态更新、检查点等一起提交,因此不同输出 PCollection 之间没有漂移。在这种特定情况下,偏移量是直接从要输出的元素中提取的,因此它们精确对应。在将偏移量提交回 Kafka 之前,输出和偏移量都会持久地提交给 Dataflow 的内部存储。
    2. 您是正确的,已处理元素的偏移量被分组为 5 分钟的事件时间窗口(Kafka 记录时间)并采用最大偏移量。虽然 5 分钟是任意持续时间,但偏移量对应于已成功从队列中拉出的元素。

    【讨论】:

    • 感谢您的回复。 1. 这对了解很有帮助,我承认我对捆绑包如何工作的理解非常有限。 2.我仍然不确定这将如何影响我的示例场景,在我看来,使用上述数据,给定某个聚合窗口/触发器,可以将直到 msg ZZ 的元素从队列中拉出,然后由 KafkaOffsetCommit 时间线提交,当仅输出元素 E 时,后续 msgs 无法输出到我的接收器?那么我需要重新处理整个队列,因为根据Kafka Offsets,ZZ被处理了?
    • 元素和它们的偏移量都将从队列中读取,并以原子方式提交到 Dataflow 的内部存储。然后将从 Dataflow 的内部存储中读取偏移量并提交给 Kafka。
    • @Vincent 关于你的“2”状态。你的意思是当“ZZ”消息已经被读取并且它的偏移量已经被提交但是它仍然没有被下游处理,并且你的管道在这个时候失败了,因为在处理“E”消息时出现了错误以前读过吗?
    • @AlexeyRomanenko 是的,这正是我要问的,有没有办法确保我的管道只提交已向下游发出的元素的偏移量?否则在我看来可能会丢失数据。
    • 如果我没记错的话,我认为 Beam 并没有提供开箱即用的功能,比如“只处理一次”。在这种情况下,一旦您的记录(或记录包)被处理,您可能需要自己提交偏移量。
    猜你喜欢
    • 1970-01-01
    • 2021-10-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-12
    • 2018-02-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多