【发布时间】:2013-12-26 09:39:39
【问题描述】:
我正在使用 kafka spout 来消费消息。但是如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始? Kafka spout 让我们指定从哪里消费的时间戳,但我怎么知道时间戳?
【问题讨论】:
我正在使用 kafka spout 来消费消息。但是如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始? Kafka spout 让我们指定从哪里消费的时间戳,但我怎么知道时间戳?
【问题讨论】:
基本上,事件的顺序是:
第一次启动拓扑,从以下属性开始读取:
forceFromStart = true
startOffsetTime = -2
上面的道具会强制它从话题的开头开始。请记住同时拥有这两个属性,因为forceFromStart 告诉storm 读取startOffsetTime 属性并使用设置的值来确定从哪里开始读取,并忽略zookeeper 偏移量。
从现在开始,您的拓扑将运行,zookeeper 将保持偏移量。如果你的工人死了,它将由主管启动,并开始从 zookeeper 中的偏移量读取。
现在,如果您想重新启动拓扑,并且想从关闭前停止的位置读取,请使用以下属性并重新启动拓扑:
forceFromStart = false
通过上述属性,您告诉storm 不要读取startOffsetTime 值,而是使用在您关闭拓扑之前已维护的zookeeper 偏移量。
从现在开始,每次重新启动拓扑时,它都会从原来的位置读取。
如果您想重新启动拓扑并从主题的头部/顶部读取,请使用以下属性并重新启动拓扑:
forceFromStart = true
startOffsetTime = -1
通过上述属性,您告诉storm忽略zookeeper偏移并从作为主题提示的最新偏移开始。
【讨论】:
如果您使用的是 KafkaSpout,请确保以下几点:
KafkaSpout 将偏移量存储到 Zookeeper 中。如果在 KafkaSpout 的 KafkaConfig 中将 forceFromStart 设置为 true(首次部署拓扑时可能会出现这种情况),则在重新部署期间要非常小心,它将忽略存储的 zookeeper 偏移量。确保将其设置为 false。
考虑编写拓扑,以便在执行拓扑的 main() 方法时从属性文件中读取 KafkaConfig.forceFromStart 值。这将允许您的管理员控制是否重播 Kafka 消息。
【讨论】:
spoutConfig.forceStartOffsetTime(-1);
它将选择围绕该时间戳写入的最新偏移量以开始消费。你可以 通过传入 -1 强制喷口始终从最新的偏移量开始,您可以强制 它通过传入-2从最早的偏移量开始。
【讨论】: