【问题标题】:Spring Integration - kafka Outbound adapter not taking topic value exposed as spring beanSpring Integration - kafka Outbound 适配器未将主题值公开为 spring bean
【发布时间】:2017-07-27 03:17:10
【问题描述】:

我已成功集成具有固定主题名称的 kafka 出站通道适配器。现在,我想让主题名称可配置,因此希望通过应用程序属性公开它。

application.properties 包含以下条目之一:

kafkaTopic:testNewTopic

我的配置类如下所示:

@Configuration
@Component
public class KafkaConfig {

    @Value("${kafkaTopic}")
    private String kafkaTopicName;

    @Bean
    public String getTopic(){
    return kafkaTopicName;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//this.brokerAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // set more properties
        return new DefaultKafkaProducerFactory<>(props);
    }
}

在我的 si-config.xml 中,我使用了以下内容(例如:topic="getTopic"):

<int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter" kafka-template="kafkaTemplate"
        auto-startup="true" sync="true" channel="inputToKafka" topic="getTopic">
    </int-kafka:outbound-channel-adapter>

但是,当通过 bean 公开时,配置无法获取主题名称。但是当我对主题名称的值进行硬编码时,它可以正常工作。

有人可以建议我在这里做错了什么吗?

kafka 出站通道中的topic 是否接受称为bean 的值?

我如何将它外部化,因为使用我的实用程序的每个应用程序都会提供不同的 kafka 主题名称

【问题讨论】:

    标签: spring apache-kafka spring-integration spring-annotations


    【解决方案1】:

    topic 属性用于字符串值。

    但它支持属性占位符解析:

    topic="${kafkaTopic}"
    

    以及上述 bean 的 SpEL 评估:

    topic="#{getTopic}"
    

    只是因为 XML 解析器配置允许这样做。

    但是您可能会注意到,您注入到&lt;int-kafka:outbound-channel-adapter&gt; 中的KafkaTemplate 具有defaultTopic 属性。因此,您无需担心 XML。

    还有一个可供您使用的选项是 Spring Integration Annotations 配置。您可以在其中为KafkaProducerMessageHandler @Bean 定义@ServiceActivator

    @ServiceActivator(inputChannel = "inputToKafka")
    @Bean
    KafkaProducerMessageHandler kafkaOutboundChannelAdapter() {
         kafkaOutboundChannelAdapter adapter = new kafkaOutboundChannelAdapter( kafkaTemplate());
         adapter.setSync(true);
         adapter.setTopicExpression(new LiteralExpression(this.kafkaTopicName));
         return adapter;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-12-14
      • 2017-04-28
      • 1970-01-01
      • 2015-12-12
      • 1970-01-01
      • 1970-01-01
      • 2021-03-20
      • 1970-01-01
      相关资源
      最近更新 更多