【发布时间】:2021-03-12 07:11:52
【问题描述】:
我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,它应该停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应该开始接受来自其他 Kafka 主题的消息。
有没有办法在 Spring boot Kafka Streams 中实现这一点?
【问题讨论】:
标签: java spring-boot apache-kafka spring-kafka circuit-breaker
我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,它应该停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应该开始接受来自其他 Kafka 主题的消息。
有没有办法在 Spring boot Kafka Streams 中实现这一点?
【问题讨论】:
标签: java spring-boot apache-kafka spring-kafka circuit-breaker
我可以通过使用BindingsEndpoint 来实现这一点。
private final BindingsEndpoint binding;
@Override
public void stop() {
List<?> objects = binding.queryStates();
if (!objects.isEmpty()) {
log.info("Stopping Kafka topics ");
List<Binding> bindings = getBindings(objects);
bindings.forEach(Binding::stop);
log.info("Stopped Kafka topics ");
}
}
@Override
public void start() {
List<?> objects = binding.queryStates();
if (!objects.isEmpty()) {
log.info("Starting Kafka topics ");
List<Binding> bindings = getBindings(objects);
bindings.forEach(Binding::start);
log.info("Started Kafka topics ");
}
}
protected List<Binding> getBindings(List<?> objects) {
return objects.stream().filter(object -> object instanceof Binding)
.map(object -> (Binding) object).collect(Collectors.toList());
}
【讨论】: