【发布时间】:2019-08-27 03:35:19
【问题描述】:
我有一个 Spring-Cloud-Streams 客户端读取由多个分区组成的 Kakfa 主题。客户端为它读取的每条 Kafka 消息调用一个 Web 服务。如果重试几次后 web 服务不可用,我想阻止消费者从 Kafka 读取。参考之前的 Stackoverflow 问题 (Spring cloud stream kafka pause/resume binders),我自动连接了 BindingsEndpoint 并调用 changeState() 方法来尝试停止消费者,但日志显示消费者继续阅读调用 changeState() 后来自 Kafka 的消息。
我正在使用 Spring Boot 版本 2.1.2.RELEASE 和 Spring Cloud 版本 Greenwich.RELEASE。 spring-cloud-stream-binder-kafka 的托管版本是 2.1.0.RELEASE。我已设置属性 autoCommitOffset=true 和 autoCommitOnError=false。
下面是我的代码的 sn-p。有什么我错过的吗? changeState() 的第一个输入参数应该是主题名称吗?
如果我希望消费者应用程序在 web 服务不可用时退出,我可以简单地执行 System.exit() 而不需要先停止消费者吗?
@Autowired
private BindingsEndpoint bindingsEndpoint;
...
...
@StreamListener(MyInterface.INPUT)
public void read(@Payload MyDTO dto,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
try {
logger.info("Processing message "+dto);
process(dto); // this is the method that calls the webservice
} catch (Exception e) {
if (e instanceof IllegalStateException || e instanceof ConnectException) {
bindingsEndpoint.changeState("my.topic.name",
BindingsEndpoint.State.STOPPED);
// Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object
}
e.printStackTrace();
throw (e);
}
}
【问题讨论】:
标签: spring-boot apache-kafka spring-cloud-stream