【问题标题】:Spring Kafka Consumer - Getting topics from a complex objectSpring Kafka Consumer - 从复杂对象中获取主题
【发布时间】:2020-12-14 09:51:28
【问题描述】:

我正在开发一个 Spring Boot kafka 消费者应用程序。它将有不同的消费者处理不同的主题。消费者的所有信息都来自 application.yml 文件。

application:
  kafka:
    consumer-config:
      - name: consumer-a
        topics: topic1,topic2
        ......
      - name: consumer-b
        topics: topic3,topic4
        .....

我无法将主题列表从应用程序属性设置到 KafkaListener。

我尝试了以下方法:

@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")


@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")

在这两种情况下,我都会收到以下错误:

java.lang.IllegalArgumentException: 无法解析占位符

从应用程序属性中获取主题并将其设置在 KafkaListener 主题上的最佳方式是什么?

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka


    【解决方案1】:

    你用的是什么版本?我刚刚测试了它,它工作正常......

    @SpringBootApplication
    public class So63583349Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So63583349Application.class, args);
        }
    
        @KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349 ] 获取关联 ID 为 41 的元数据时出错:{topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}

    对于第二个,您不能在属性占位符中使用 SpEL 选择。这是针对这种情况的一种解决方案:

    @SpringBootApplication
    public class So63583349Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So63583349Application.class, args);
        }
    
        @KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
                id = "so63583349")
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        Props props() {
            return new Props();
        }
    
    }
    
    @ConfigurationProperties(value = "application.kafka")
    class Props {
    
        List<Properties> consumerConfig;
    
        public List<Properties> getConsumerConfig() {
            return this.consumerConfig;
        }
    
        public void setConsumerConfig(List<Properties> consumerConfig) {
            this.consumerConfig = consumerConfig;
        }
    
    }
    

    【讨论】:

    • 谢谢加里。我正在使用 Spring Boot 2.3.2.RELEASE。
    • 第二种情况,不能在占位符内使用SpEL选择;我为这种情况添加了一种解决方案。
    • 谢谢你,Gary 你又救了我。使用您在此处发布的内容以及在另一个线程的另一个回复中:*.com/a/57064927/14165010 我以这种方式解决了它:@Bean public String[] InterestTopics() { return this.applicationProperties.getKafka().getConsumerConfig() .get(0) .getTopics().stream().toArray(String[]::new);并且:@KafkaListener(topics = "#{@interestedTopics}", containerFactory = "kafkaListenerContainerFactory")
    最近更新 更多