【问题标题】:Dynamic topic name configuration for spring kafka when using spring integration Kafka adapters?使用spring集成Kafka适配器时spring kafka的动态主题名称配置?
【发布时间】:2017-09-29 15:13:43
【问题描述】:

我有需要再次重用的 Spring 集成流程。

@Bean
public IntegrationFlow sendToKafkaFlowRequest(@Value("${kafka.document-consume-topic}") String topic,
                                              ProducerFactory<?, Message> producerFactory) {
    return IntegrationFlows.from("kafkaRequestChannel")
            .handle(Kafka
                    .outboundChannelAdapter(producerFactory)
                    .messageKey(m -> m
                            .getHeaders()
                            .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                    .topicExpression("headers[kafka_topic] ?: '" + topic + "'"))
            .get();
}


@Bean
public IntegrationFlow listeningFromKafkaFlow(@Value("${kafka.document-consume-topic}") String topic,
                                              ConsumerFactory<?, Message> consumerFactory) {
    return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.record, topic)
            .configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.RECORD))
            .retryTemplate(new RetryTemplate())
            .filterInRetry(true))
            .channel("interMessageChannel")
            .get();
}

我想一次又一次地将这两个流程重复用于多个主题。但问题是主题是硬编码的。 问题是我们可以使用消息的标题将主题名称放在其中吗?这会是一个问题吗 ?

【问题讨论】:

    标签: spring-integration spring-kafka


    【解决方案1】:

    好的。看,Kafka.messageDrivenChannelAdapter() 可以接受多个主题,因此您只需要一个流程即可。

    是的,你总是可以在通过Kafka.outboundChannelAdapter() 发送到 Kafka 之前将kafka_topic 标头设置到消息中。再说一遍:你也不需要在这里复制IntegrationFlow。唯一发送到针对消息解决的主题就足够了。

    【讨论】:

    • listeningFromKafkaFlow() 怎么样,如何不复制它如何在不复制相同流的情况下使用消息?
    • 我告诉过你:Kafka.messageDrivenChannelAdapter() 可以接受主题的可变参数。您不能只将所有主题注入 listeningFromKafkaFlow bean 定义并将它们委托给 Kafka.messageDrivenChannelAdapter()
    猜你喜欢
    • 2021-08-02
    • 1970-01-01
    • 2016-07-28
    • 2019-01-11
    • 2022-12-10
    • 1970-01-01
    • 1970-01-01
    • 2018-12-26
    • 2023-03-31
    相关资源
    最近更新 更多