【问题标题】:Pause/Start Kafka Stream processors in Spring boot在 Spring Boot 中暂停/启动 Kafka 流处理器
【发布时间】:2021-03-12 07:11:52
【问题描述】:

我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,它应该停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应该开始接受来自其他 Kafka 主题的消息。

有没有办法在 Spring boot Kafka Streams 中实现这一点?

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka circuit-breaker


    【解决方案1】:

    我可以通过使用BindingsEndpoint 来实现这一点。

    private final BindingsEndpoint binding;
    
    @Override
    public void stop() {
        List<?> objects = binding.queryStates();
        if (!objects.isEmpty()) {
            log.info("Stopping Kafka topics ");
            List<Binding> bindings = getBindings(objects);
            bindings.forEach(Binding::stop);
            log.info("Stopped Kafka topics ");
        }
    }
    
    @Override
    public void start() {
        List<?> objects = binding.queryStates();
        if (!objects.isEmpty()) {
            log.info("Starting Kafka topics ");
            List<Binding> bindings = getBindings(objects);
            bindings.forEach(Binding::start);
            log.info("Started Kafka topics ");
        }
    }
    
    protected List<Binding> getBindings(List<?> objects) {
        return objects.stream().filter(object -> object instanceof Binding)
                .map(object -> (Binding) object).collect(Collectors.toList());
    }
    

    【讨论】:

      猜你喜欢
      • 2018-10-03
      • 1970-01-01
      • 2021-09-23
      • 2019-02-11
      • 2020-11-11
      • 2021-07-02
      • 1970-01-01
      • 1970-01-01
      • 2020-02-20
      相关资源
      最近更新 更多