【问题标题】:Start Spring Boot app with Spring Integration Kafka consumers paused使用 Spring Integration 启动 Spring Boot 应用程序 Kafka 消费者已暂停
【发布时间】:2018-10-03 20:40:35
【问题描述】:

我正在开发一个 Spring Boot 应用程序,该应用程序使用以 Kafka 主题为源的 Spring Integration 流。我们的集成流程开始使用包含带有 springframework.cloud.stream.annotation.Input 和 Output 注释的 SubscribableChannels 的接口。这些配置为通过 Cloud Config 和 spring.cloud.stream.kafka.bindings 从 Kafka 读取。

当应用首次启动时,它会立即开始读取 Kafka 主题。这是一个问题,因为应用需要初始化一些本地的、不可持久的数据库,然后才能开始正确处理传入的 Kafka 消息。

我们目前正在使用 @PostConstruct 在 Kafka 启动之前填充这些内存数据库,但这不是最理想的,因为应用程序无法使用 Eureka、Feign 等来可靠地找到具有最新数据的健康服务内存数据库。

由于多种原因,无法更改架构,以便共享或预填充内存数据库。只要知道当我称它为内存数据库时,我是在简化一些事情,它实际上是另一种服务。

启动 Spring Boot 应用程序的最佳方式是什么,以便从 Kafka 读取的集成流以暂停状态启动,并且可以在其他进程完成后取消暂停?

【问题讨论】:

    标签: java spring spring-boot apache-kafka spring-integration


    【解决方案1】:

    我假设您使用 KafkaMessageDrivenChannelAdapter 并根据您提到的 Spring Integration Java DSL - 确切地说是 Kafka.messageDrivenChannelAdapter()。可以使用idautoStartup(false) 进行配置。因此它不会立即开始使用 Kafka 主题。每当您准备好使用时,您可以start() 这个组件使用提到的 id 从应用程序上下文中以Lifecycle 的形式获取它。

    或者您可以发送适当的消息到Control Bus

    更新

    如果你处理 Spring Cloud Stream 和 Kafka Binder,你应该考虑注入一个 BindingsEndpoint bean 并执行它的 changeState(@Selector String name, State state) 作为你的绑定名称和 State.STOPPED。当您的内存数据库准备就绪时,您可以使用 State.STARTED 回调它:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control

    【讨论】:

    • 我不记得看过那些课程。我编辑了我的问题,提供了有关 Kafka 主题如何连接到集成流程的更多详细信息。
    • 哦!那是不同的故事。您必须与我们共享代码。以目前的状态,您在误导我们。肯定有一个IntegrationFlow 抽象。这就是我感到困惑的原因:docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/…
    • @TomatoCo 刚刚和 Artem 讨论过。 . .目前我们在 spring cloud stream 中没有这个功能,我假设你正在使用它。如果是这样,我建议也提出问题here,以便我们解决它。
    • 在我的回答中查看更新。
    猜你喜欢
    • 1970-01-01
    • 2021-08-15
    • 1970-01-01
    • 1970-01-01
    • 2019-06-22
    • 2016-12-09
    • 2017-08-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多