【问题标题】:Kafka Streams Fault Tolerance with Offset Management in ParellelKafka Streams 容错与并行偏移管理
【发布时间】:2021-03-05 22:44:31
【问题描述】:

说明

我有一个使用主题的 Kafka Stream 应用程序。 活动数量众多。

KafkaStream 会将这些事件作为终端操作使用,并将这些事件组合成一堆,比如 1000 个事件,然后将其写入 AWS S3。

我有线程在消费来自 Kafka 主题的事件后并行写入 s3。

由于某些业务应用程序逻辑和处理,不使用 kafka-connector-s3。

问题 ::

我希望应用程序具有容错性,不想丢失消息。

--> 崩溃场景

假设应用程序有 10 个线程都在运行并试图将事件放入 S3,并且在这种情况下发生崩溃,因为 KafkaStream 具有 (enable.auto.commit = false),我们无法手动提交偏移量并且所有线程都消费了来自 Kafka 主题的消息。 在这种情况下,KafkaStreams 已经在读取后提交了偏移量,但它无法将事件处理到 S3。

我需要一种机制,以便我可以确定在事件成功写入 S3 文件之前的最后偏移量是多少。 在崩溃场景中,我应该如何处理这个问题以及如何管理 Kafka Streams 中的 Kafka 偏移量,因为我使用了 10 个线程。如果有些未能写入 s3 而有些则通过了怎么办。如何确保偏移量的排序成功处理到 s3?

如果我不清楚我的问题陈述,请告诉我。

谢谢!

【问题讨论】:

    标签: amazon-s3 apache-kafka kafka-consumer-api apache-kafka-streams apache-kafka-connect


    【解决方案1】:

    我可以向您保证,enable.auto.commit 在 Kafka Streams 中设置为 false。位于 https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsConfig.html 状态的 Javadocs

    "enable.auto.commit" (false) - Streams 客户端将始终禁用/关闭自动提交

    你说得对,Kafka Streams 会或多或少地定期自动提交。但是,Kafka Streams 会等到记录处理完毕后再提交相应的偏移量。这意味着您至少会得到至少一次保证并且不会丢失消息。

    据我了解您的应用程序,在将记录发送到 S3 之前,您的终端处理器不会阻塞。这意味着,Kafka Streams 无法知道发送何时完成。 Kafka Streams 只是看到终端处理器完成了它的处理,然后——如果提交间隔已过——它会提交偏移量。

    你说

    由于某些业务应用程序逻辑和处理,不使用 kafka-connector-s3。

    你能不能把业务应用逻辑放到Kafka Streams应用中,将结果写入带有操作符to()的Kafka主题,然后使用kafka-connector-s3将该主题中的消息发送到S3? 我不是连接专家,但我想这可以确保消息不会丢失,并使您的实现更简单。

    【讨论】:

    • 嗨布鲁诺,谢谢。在我的用例中,我实际上想使用来自 kafka 的消息并制作一堆说 5000 条消息并将其写入 s3,因为写入 s3 是一项耗时的操作。现在,如果我开始从流中消费直到 5000 条消息,然后执行 S3 写入,偏移量将被提交,直到 5000 条消息并且如果 s3 写入失败。我正在考虑使用 Kafka Consumer,因为它支持手动提交偏移量。我相信 kafka 流最适合你想读写 kafka 主题时,但这里我只是在阅读和做终端操作。
    • 您对 kafka 流与 kafka 消费者有何看法。??
    • 如果你想对偏移提交有最大的控制,你应该使用 Kafka 消费者。请注意,在 Kafka Streams 中,如果您使用可以访问ProcessorContext 的操作,例如process()transform(),您也可以请求提交。使用commit.interval.ms,您可以控制 Kafka Streams 自动完成的提交间隔。
    • 关于使用多个线程写入 S3,我会考虑通过使用更多流线程和/或并行运行更多 Kafka Streams 客户端(或更多 Kafka 消费者)来扩展输入主题的分区数量)。如果您的输入分区的分区很少,您可以先将输入主题的消息写入具有更多分区的主题,然后从该主题中读取并按分区号进行扩展。
    • 您还没有回答我的问题,即是否可以先使用 Kafka Streams 来应用业务逻辑,将消息写入主题,然后使用 S3 连接器。如果它不是一个选项,我会感兴趣为什么它不是一个选项?使用 Kafka Streams 和 Connect 可以减轻您在发生故障时实现偏移管理等低级功能的负担。
    【解决方案2】:

    使用 kafka-stream ,您可以将来自源主题的 5000 条消息聚合到一个大消息中,并将大消息发送到另一个主题,例如 middle_topic。您需要来自 middle_topic 的另一个 proceccor 源并使用 s3-connector 接收到 s3。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-07-17
      • 1970-01-01
      • 1970-01-01
      • 2018-09-22
      • 2021-03-07
      • 2021-01-15
      • 2018-08-30
      • 2017-07-09
      相关资源
      最近更新 更多