【发布时间】:2019-11-14 22:34:30
【问题描述】:
我有一个使用 Spring Cloud Stream (v1.3.0) 和 Kafka (v1.1.6) 的 spring boot (v.1.57) 应用程序。我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即使用@StreamListener 注释)应该:
- 停止轮询新消息
- 完成他们的工作
- 将偏移量提交给 Kafka
我注意到 ContainerProperties 中有一个名为“shutdownTimeout”的属性(默认设置为 10000 毫秒),因此我尝试通过反射扩展 ConcurrentKafkaListenerContainerFactoryConfigurer 类(因为它具有 @ConditionalOnMissingBean 注释)将其修改为 30000像这样:
@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {
@Autowired
private KafkaProperties kproperties;
@Override
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
myAccessor.setPropertyValue("properties", kproperties);
ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
super.configure(listenerContainerFactory, consumerFactory);
containerProperties.setShutdownTimeout(30000);
}
}
但这并不成功。还尝试将它 (shutdownTimeout: 30000) 放在 spring cloud stream binder 设置下的 application.yml 中,但同样没有帮助。
有什么方法可以控制关机过程并实现我的目标?
【问题讨论】:
标签: java spring-boot apache-kafka spring-cloud-stream