【发布时间】:2019-06-25 02:11:11
【问题描述】:
我正在设计一个 apachestorm 拓扑(使用 streamparse),它由一个 spout(apache kafka spout)和 1 个并行度 > 1 的 bolt 构建,从 kafka spout 中批量读取消息并将消息保存在 mysql 表中
螺栓批量读取消息。如果批处理成功完成,我手动提交 apache kafka 偏移量。
当 mysql 上的 bolt insert 失败时,我不会在 kafka 中提交偏移量,但有些消息已经在 spout 发送到 bolt 的消息队列中。
应该删除已经在队列中的消息,因为我无法在不丢失先前失败消息的情况下推进 kafka 偏移量。
streamparse 中有没有办法在bolt 启动时清除或失败所有已经在队列中的消息?
【问题讨论】: