【问题标题】:Spring Kafka Template - Connect to Kafka Topic on Spring Boot StartupSpring Kafka 模板 - 在 Spring Boot 启动时连接到 Kafka 主题
【发布时间】:2022-01-02 15:17:33
【问题描述】:

我已经实现了一个使用 Spring Kafka 的基本 Spring Boot 应用程序。我希望我的制作人在调用第一个 .send() 之前连接到 Kafka 主题,但我找不到这样做的方法。这可能吗?

日志显示KafkaTemplate仅在我在16:12:44触发.send方法后才连接到Kafka Topic:

2021-11-24 16:12:12.602  INFO 63930 --- [           main] c.e.k.KafkaProducerExampleApplication    : The following profiles are active: dev
2021-11-24 16:12:13.551  INFO 63930 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559  INFO 63930 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-11-24 16:12:13.559  INFO 63930 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613  INFO 63930 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-11-24 16:12:13.613  INFO 63930 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 974 ms
2021-11-24 16:12:13.989  INFO 63930 --- [           main] pertySourcedRequestMappingHandlerMapping : Mapped URL path [/v2/api-docs] onto method [springfox.documentation.swagger2.web.Swagger2Controller#getDocumentation(String, HttpServletRequest)]
2021-11-24 16:12:14.190  INFO 63930 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-24 16:12:14.190  INFO 63930 --- [           main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2021-11-24 16:12:14.207  INFO 63930 --- [           main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2021-11-24 16:12:14.239  INFO 63930 --- [           main] s.d.s.w.s.ApiListingReferenceScanner     : Scanning for api listing references
2021-11-24 16:12:14.336  INFO 63930 --- [           main] c.e.k.KafkaProducerExampleApplication    : Started KafkaProducerExampleApplication in 7.055 seconds (JVM running for 7.341)
2021-11-24 16:12:44.550  INFO 63930 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-24 16:12:44.550  INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-11-24 16:12:44.551  INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2021-11-24 16:12:44.649  INFO 63930 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    SmartLifecycle bean 为我们工作,谢谢。

    @Component
    class KafkaProducer (
        private val userChangeLogTemplate: KafkaTemplate<String, UserChangeLog>
        private val kafkaProperties: MizenKafkaProperties
    ) : NotificationProducer{
    
        @Bean
        fun connector(pf: ProducerFactory<String, Any>): SmartLifecycle {
            return object : SmartLifecycle {
                override fun stop() {}
                override fun start() {
                    pf.createProducer().close()
                }
    
                override fun isRunning(): Boolean {
                    return false
                }
            }
        }
    
        override fun sendUserChangeLog(message: UserChangeLog) {
            userChangeLogTemplate.send(kafkaProperties.userChangeLogTopic, message)
        }
    }
    

    【讨论】:

      【解决方案2】:

      关于 Linh Vu 的回答,最好不要在 bean 定义中创建连接 - 这在应用程序上下文的生命周期中为时过早。

      相反,添加一个实现SmartLifecycle的bean并在start()中创建连接;这样,上下文将在连接之前完全初始化。

      @Bean
      SmartLifecycle connector(ProducerFactory<Object ,Object> pf) {
          return new SmartLifecycle() {
              
              @Override
              public void stop() {
              }
              
              @Override
              public void start() {
                  pf.createProducer().close();
              }
              
              @Override
              public boolean isRunning() {
                  return false;
              }
              
          };
      }
      

      【讨论】:

      • 嗨,Gary,我可以知道它在应用程序上下文的生命周期中过早是什么意思吗?如果我在 Bean 定义中连接会出现什么样的问题?谢谢。
      • 在这种情况下可能是无害的,但这只是最佳实践;应用程序上下文初始化分为三个不同的阶段 - 收集 bean 定义、实例化所有 bean、启动任何 SmartLifecycle bean;如果您在第 2 阶段开始执行连接到外部服务器等活动,则可能并非所有 bean 都可用。
      【解决方案3】:

      使用non-transactional producer(未提供transactionIdPrefix),当您第一次调用KafkaTemplate.send 时,它将委托给ProducerFactory 以获取Producer单个实例。此时,由于之前没有Producer单个实例ProducerFactory 将为您创建这个(这就是您看到日志ProducerConfig : ProducerConfig values ... 的原因)。这个生产者实例现在被所有客户端使用/共享。


      所以如果你想预先创建上面的生产者实例,你可以直接在ProducerFactory上调用它,例如:

      @Bean
      public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
              KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
              kafkaProducerFactory.createProducer();
              return kafkaTemplate;
      ...
      

      【讨论】:

      • 在 bean 定义中连接到代理不是一个好主意 - 这在应用程序上下文的生命周期中为时过早 - 最好在您的一个 bean 中实现 SmartLifecycle 并建立start()中的连接。
      • 感谢@GaryRussell 先生指出这一点。你能举个例子吗?我不擅长 SpringCore,所以我并没有真正弄清楚。或者你能发布另一个答案吗?我会删除我的。
      • 查看我的答案以获取示例。
      猜你喜欢
      • 1970-01-01
      • 2019-08-25
      • 2019-08-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-13
      相关资源
      最近更新 更多