【发布时间】:2015-07-30 21:53:40
【问题描述】:
我正在使用storm-kafka-0.9.3从Kafka读取数据并在Storm中处理这些数据。下面是我正在使用的 Kafka Spout。但问题是当我杀死 Storm 集群时,它不会读取在它死亡期间发送的旧数据,它会从最新的偏移量开始读取。
BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);
SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
, "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Never should make this true
spoutConfig.forceFromStart=false;
spoutConfig.startOffsetTime =-2;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;
【问题讨论】:
-
你能不能试着注释掉
spoutConfig.forceFromStart=false;行或设置spoutConfig.forceFromStart=true -
试过了,但同样的问题,实际看到假设我在 kafka 中有 100 条消息,Storm 正在处理它,现在假设在第 100 条消息之后,Storm 关闭并且我的 http 端点在 Kafka 中推送了 300 条消息,因为 Storm只处理了 100 条消息,我希望当 Storm 醒来时它应该从它离开的 101 条消息开始处理。
-
那么到底发生了什么?在您的帖子中,您提到它从最新的偏移量开始读取..这不是您要找的吗?
-
基本上,当 strom 回来时,它会从 401 而不是 101 开始读取。