【问题标题】:How to perform a graceful application shutdown when using Spring Cloud Stream with Kafka?将 Spring Cloud Stream 与 Kafka 一起使用时如何执行优雅的应用程序关闭?
【发布时间】:2019-11-14 22:34:30
【问题描述】:

我有一个使用 Spring Cloud Stream (v1.3.0) 和 Kafka (v1.1.6) 的 spring boot (v.1.57) 应用程序。我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即使用@StreamListener 注释)应该:

  1. 停止轮询新消息
  2. 完成他们的工作
  3. 将偏移量提交给 Kafka

我注意到 ContainerProperties 中有一个名为“shutdownTimeout”的属性(默认设置为 10000 毫秒),因此我尝试通过反射扩展 ConcurrentKafkaListenerContainerFactoryConfigurer 类(因为它具有 @ConditionalOnMissingBean 注释)将其修改为 30000像这样:

@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {

    @Autowired
    private KafkaProperties kproperties;

    @Override
    public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
                          ConsumerFactory<Object, Object> consumerFactory) {
        PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
        myAccessor.setPropertyValue("properties", kproperties);

        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        super.configure(listenerContainerFactory, consumerFactory);
        containerProperties.setShutdownTimeout(30000);
    }
}

但这并不成功。还尝试将它 (shutdownTimeout: 30000) 放在 spring cloud stream binder 设置下的 application.yml 中,但同样没有帮助。

有什么方法可以控制关​​机过程并实现我的目标?

【问题讨论】:

    标签: java spring-boot apache-kafka spring-cloud-stream


    【解决方案1】:

    编辑

    不再需要进行这种反射破解;只需将 ListenerContainerCustomizer @Bean 添加到应用程序上下文即可。见here

    EDIT_END

    不再支持spring-kafka 1.1.x;您应该使用 1.3.9 和启动 1.5.x。

    当前的 Boot 1.5.x 版本是 1.5.21。

    您应该立即升级。

    但是,所有这些项目都有更新的版本。

    Spring Cloud Stream 不使用该工厂或引导属性来创建其容器;它没有公开在容器上配置该属性的机制。

    Spring Cloud Stream 2.1 添加了ListenerContainerCustomizer,它允许您通过在其上设置任何属性来自定义绑定容器。

    我建议你升级到 Boot 2.1.6 和 Spring Cloud Stream Germantown (2.2.0)。

    编辑

    这有点小技巧,但它应该可以工作,直到您可以升级到更新的流版本...

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So56883620Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56883620Application.class, args).close();
        }
    
        private final CountDownLatch latch = new CountDownLatch(1);
    
        @StreamListener(Sink.INPUT)
        public void listen(String in) throws InterruptedException {
            this.latch.countDown();
            System.out.println(in);
            Thread.sleep(6_000);
            System.out.println("exiting");
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> {
                IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
                // wait for listener to start
                this.latch.await(10, TimeUnit.SECONDS);
                System.out.println("Shutting down");
            };
        }
    
        @Bean
        public SmartLifecycle bindingFixer(BindingService bindingService) {
            return new SmartLifecycle() {
    
                @Override
                public int getPhase() {
                    return Integer.MAX_VALUE;
                }
    
                @Override
                public void stop() {
                    // no op
                }
    
                @Override
                public void start() {
                    @SuppressWarnings("unchecked")
                    Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
                            .getPropertyValue("consumerBindings");
                    @SuppressWarnings("unchecked")
                    Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
                    ((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
                            .getPropertyValue("lifecycle.messageListenerContainer"))
                                    .getContainerProperties().setShutdownTimeout(30_000L);
                }
    
                @Override
                public boolean isRunning() {
                    return false;
                }
    
                @Override
                public void stop(Runnable callback) {
                    callback.run();
                }
    
                @Override
                public boolean isAutoStartup() {
                    return true;
                }
            };
        }
    
    }
    

    【讨论】:

    • 为了清楚起见,我使用的是 Kafka v1.1.6,我没有使用 spring-kafka(但使用 spring-cloud-stream,我猜想以某种方式使用它)。跨度>
    • 我们计划在不久的将来(几个月后)升级到 spring-boot 2(无论是最新版本),但不幸的是我们现在不能这样做。如果 spring-cloud-stream 不允许我们配置该属性,是否还有其他方式来执行正常关闭?也许使用其他机制或自定义解决方案?
    • 无论如何,您都需要覆盖流引入的版本。使用 start.spring.io 创建的新 1.5 启动应用程序会自动执行此操作。我今天休息。我会试着看看在 nrxt 周是否有工作。
    • 抱歉耽搁了;我找到了一种解决方法——不过,这有点像 hack。查看编辑。
    • 谢谢你,Gary - 我会用我的代码测试它,然后会报告
    猜你喜欢
    • 2019-02-23
    • 1970-01-01
    • 2021-09-03
    • 1970-01-01
    • 2016-06-09
    • 2018-04-28
    • 2021-10-28
    • 1970-01-01
    相关资源
    最近更新 更多