【发布时间】:2016-01-08 17:34:48
【问题描述】:
我试图弄清楚为什么每次我重新启动 Storm 拓扑时我的所有 Kafka 消息都会被重播。
我的理解是,一旦最后一个 Bolt 确认了元组,spout 应该在 Kafka 上提交消息,因此我不应该在重启后看到它重播。
我的代码是一个简单的 Kafka-spout 和一个 Bolt,它只打印每条消息然后确认它们。
private static KafkaSpout buildKafkaSpout(String topicName) {
ZkHosts zkHosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
topicName,
"/" + topicName,
"mykafkaspout"); /*was:UUID.randomUUID().toString()*/
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConfig);
}
public static class PrintBolt extends BaseRichBolt {
OutputCollector _collector;
public static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
LOG.error("PrintBolt.0: {}",tuple.getString(0));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("nothing"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", buildKafkaSpout("mytopic"), 1);
builder.setBolt("print1", new PrintBolt(),1).shuffleGrouping("kafka");
}
除了代码中的设置之外,我没有提供任何配置设置。
我错过了配置设置还是我做错了什么?
更新:
澄清一下,在我重新启动管道之前一切正常。以下行为是我可以从其他(非风暴)消费者那里得到的,以及我对 KafkaSpout 的期望
但是,我使用默认设置得到的实际行为如下。消息处理得很好,直到我停止管道,然后当我重新启动时,我会重播所有消息,包括那些我认为我已经确认的消息(A 和 B)
根据 Matthias 提到的 configuration options,我可以将 startOffsetTime 更改为 Latest,但这实际上是管道丢弃管道时产生的消息(消息“C”)的最新位置正在重新启动。
我有一个用 NodeJS 编写的消费(使用 npm kafka-node),它能够向 Kafka 确认消息,当我重新启动 NodeJs 消费者时,它完全符合我的预期(赶上消息“C”,这是在消费者下降并从那里继续)-那么我如何使用 KafkaSpout 获得相同的行为?
【问题讨论】:
-
你的意思是,只要拓扑在运行一切都很好?没有失败的元组?但是,当您终止拓扑并重新提交它时,它会再次处理旧的元组,而不是从最后一个被处理和确认的 Kafka 偏移量恢复? (顺便说一句:如果 Bolt 在您的情况下是一个接收器,则无需声明输出流。您可以将
declareOutputFields留空。) -
是的,这正是我的意思——它可以正常工作,没有失败的元组,但是在重新启动时,我看到了已经处理的元组的重播。根据您的链接玩弄 startOffsetTime 会改变行为,并将其设置为
LatestTime现在,spouit 会丢弃在 spout 未运行时发送的所有消息 - 这也不好 - 所以不是解决方案。 -
我明白了。但是,行为本身是正确的。对于您的用例,您需要确保偏移量存储在 Zookeeper 中,以便可以在重新部署时获取它...来自链接:“重要:重新部署拓扑时,请确保 SpoutConfig 的设置.zkRoot 和 SpoutConfig.id 没有被修改,否则 spout 将无法从 ZooKeeper 读取其先前的消费者状态信息(即偏移量)——这可能会导致意外行为和/或数据丢失,具体取决于您的使用案子。” -- 在你的情况下,Zookeeper 似乎有问题。
-
ZkHosts zkHosts = new ZkHosts("localhost:2181");您没有使用生产设置,它会保留消费者状态...您需要配置 kafka
标签: java apache-kafka apache-storm