【发布时间】:2015-04-20 20:32:03
【问题描述】:
如果我在我的 Storm 拓扑中增加 Kafka Spout 的并行度,我怎样才能阻止它多次读取同一主题中的同一消息?
【问题讨论】:
标签: apache-storm apache-kafka kafka-consumer-api
如果我在我的 Storm 拓扑中增加 Kafka Spout 的并行度,我怎样才能阻止它多次读取同一主题中的同一消息?
【问题讨论】:
标签: apache-storm apache-kafka kafka-consumer-api
Storm 的 Kafka spout 将消费者偏移量持久保存到 Zookeeper,因此只要您不清除 Zookeeper 存储,它就不应多次读取同一消息。如果您看到一条消息被多次读取,也许检查偏移量是否被持久化到您的 zookeeper 实例?
我认为,默认情况下,Kafka spout 在本地运行时会启动自己的本地 Zookeeper 实例(与 Kafka 的 Zookeeper 不同),每次重启拓扑时,它的状态可能会重置。
【讨论】:
您应该检查消息是否得到正确确认。如果不是,则 spout 会将其视为失败并回复消息。
【讨论】:
MessageTimeoutSecs 中指定的元组需要更多时间。所以通过增加MessageTimeoutSecs 的值可以解决我的问题。
如果是从kafka流入storm,那么请分享更多信息。
如果数据流是从storm到kafka:
然后只需检查代码中的 TopologyBuilder。
它不应该是allGrouping,如果是则将其更改为shuffleGrouping
例子:
builder.setBolt("OUTPUTBOLT", new OutBoundBolt(boltConfig), 4)
.allGrouping("previous_bolt"); // this is wrong change it to
// shuffleGrouping
所有分组:流在所有螺栓的任务中复制。请谨慎使用此分组。
【讨论】:
您需要指定消费者组。一旦指定,Kafka 只会将下一条消息发送给您的任何 spout。所有的 spout 都应该属于同一个消费者组。
在创建消费者时,请指定以下属性
props.put("group.id", a_groupId);
【讨论】:
如果你的 kafka spout 是 Opeque 那么你需要 topology.max.spout.pending
如果您的需求满足,您可以使用 Transactional Spout 来处理此问题。
【讨论】: