【问题标题】:Spring Kafka - create streams dynamicallySpring Kafka - 动态创建流
【发布时间】:2018-12-21 03:05:19
【问题描述】:

我需要从配置文件动态创建 kafka 流,其中包含源主题名称和每个流的配置。应用程序需要有数十个 kafka 流,并且流在每个环境(例如 stage、prod)上都会有所不同。 spring-kafka 库可以做到这一点吗?

我们可以通过kafka-streams 轻松做到这一点:

@Bean
public List<KafkaStreams> kafkaStreams() {
    return streamRouteProperties.stream()
            .map(routeProperty -> createKafkaStream(routeProperty))
            .collect(toList());
}

private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
    Topology topology = builder.build();
    StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
    return new KafkaStreams(topology, streamsConfig);
}

我们需要实现springSmartLifecycle接口,所以所有的流都会自动启动和关闭。

是否可以使用spring-kafka 做同样的事情? 如我所见,我们需要在代码中创建每个 kafka 流,而且我看不到如何使用 StreamsBuilderFactoryBean 创建 kafka 流列表的可能性。 对于每个必需的流,我需要执行以下操作:

@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
    Consumed<String, String> consumed = ..;
    KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
    kStream.process(() -> eventProcessor);
    return kStream;   
}

@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

但是如何使用 StreamsBuilderFactoryBean 动态创建 kafka 流列表?

【问题讨论】:

    标签: java spring apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:

    StreamsBuilderFactoryBean 只是为 Spring 应用程序上下文带来了一些固执己见、方便的 API,但这并不意味着您应该始终与它联系在一起。

    幸运的是,StreamsBuilderFactoryBean 与常规 Kafka Streams 相比并没有太大的价值。它的最大作用是对内部创建的生命周期控制KafkaStreams

    您可以随意使用原始的 Kafka Streams API,不要过度复杂化您的代码,试图根据 StreamsBuilderFactoryBean 来满足您的要求,这确实是为一组静态选项而设计的。

    【讨论】:

    • 谢谢你解释我应该往哪个方向走
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-17
    • 2015-05-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多