【问题标题】:Apache kafka offset committing in apache storm topologyApache Storm 拓扑中的 Apache kafka 偏移提交
【发布时间】:2019-06-25 02:11:11
【问题描述】:

我正在设计一个 apachestorm 拓扑(使用 streamparse),它由一个 spout(apache kafka spout)和 1 个并行度 > 1 的 bolt 构建,从 kafka spout 中批量读取消息并将消息保存在 mysql 表中

螺栓批量读取消息。如果批处理成功完成,我手动提交 apache kafka 偏移量。

当 mysql 上的 bolt insert 失败时,我不会在 kafka 中提交偏移量,但有些消息已经在 spout 发送到 bolt 的消息队列中。

应该删除已经在队列中的消息,因为我无法在不丢失先前失败消息的情况下推进 kafka 偏移量。

streamparse 中有没有办法在bolt 启动时清除或失败所有已经在队列中的消息?

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    我不了解流解析,但我得到的印象是您想将元组捆绑起来并将它们作为批处理写入。假设您已写入偏移量 10。现在您的 Bolt 接收到偏移量 11-15,并且批处理无法写入。偏移量 15-20 已排队,您现在不想处理它们,因为那样会乱序处理批次。

    这样理解对吗?

    首先,我会放弃手动提交偏移量。你应该让喷口处理它。假设您使用的是storm-kafka-client,您可以将其配置为仅在对应的元组和所有前面的元组都被确认后才提交偏移量。

    您可能应该做的是在螺栓中(或者更好的是,在您的数据库中)跟踪失败批次中的最高偏移量。然后,当你的bolt 无法写入偏移量11-15 时,你可以使用offset > 15 使bolt 失败每个元组。在某个时候,您将再次收到偏移量 11-15,并且可以重试写入批处理。由于您使用offset > 15 失败了所有消息,因此它们也将被重试,并且将在失败批次中的消息之后到达。

    此解决方案假定您不对 spout 和 writer bolt 之间的消息流进行重新排序,因此消息按照发出的顺序到达 bolt。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-02
      • 2018-11-05
      • 1970-01-01
      • 2016-11-18
      • 2023-04-02
      • 2019-08-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多