【问题标题】:How to start a Kafka-Streams Pipeline by Topology using Quarkus如何使用 Quarkus 通过拓扑启动 Kafka-Streams 管道
【发布时间】:2020-09-16 22:28:35
【问题描述】:

我正在关注Quarkus Kafka-Streams tutorial,但不太了解如何启动管道。

在本教程中,org.apache.kafka.streams.StreamsBuilder 用于构建描述管道的org.apache.kafka.streams.Topology。构建拓扑的方法用@Produces 注释。在这个cheat sheet 中描述了这足以运行 Kafka-Streams 管道。在本教程中,还公开了一个 http-endpoint。这在我目前正在实施的服务中不是必需的。同样在示例中,从不显式调用提供程序方法。当我在没有端点的情况下启动应用程序时,管道没有启动。

this tutorial 中,管道使用拓扑显式实例化。但是这里的属性必须手动设置,并且配置不是从quarkus.kafka-streams.<something> 属性中获取的。

问题是:如何使用第一个教程中的拓扑构建器来启动它所描述的管道?最佳情况是自动应用来自quarkus.kafka-streams.<something> 的配置。

使用:

  • Java OpenJDK 11.0.8
  • Quarkus Version: 1.8.0.Final

【问题讨论】:

    标签: java apache-kafka-streams quarkus


    【解决方案1】:

    找出问题所在:不保证完全正确。只是想分享解决我问题的方法。

    最重要的是使用正确的@Producesjavax.enterprise.inject.ProducesTopology 生成方法所必需的。 javax.ws.rs.Produces 可以额外用于定义输出的 MediaType,但不是强制性的:

    
        @javax.ws.rs.Produces( MediaType.TEXT_PLAIN )
        @Produces
        @AlternativePriority( 1 )
        public Topology buildTopology() {
            ...
        }
    

    Kafka Streams 的实例由框架在启动时自动构建。运行管道所需的一切如下:

    
    @ApplicationScoped
    public class YourApplication {
        private final KafkaStreams streams;
    
        public NasDistributorApplication( final KafkaStreams streams ) {
            this.streams = streams;
        }
    
        public void onStart( @Observes final StartupEvent startupEvent ) {
            streams.start();
        }
    
        public void onStop( @Observes final ShutdownEvent shutdownEvent ) {
            streams.close();
        }
    
    }
    
    

    可能需要通过@AlternativePriority( 1 ) 注释Topology 生成方法以告诉quarkus 使用此方法来构建Topology。我不知道框架的内部结构,但我怀疑使用了默认拓扑,@AlternativePriority 比默认的Topology 赋予自定义方法更高的优先级

    【讨论】:

      猜你喜欢
      • 2019-10-23
      • 2017-06-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-28
      • 2019-10-31
      • 1970-01-01
      相关资源
      最近更新 更多