【问题标题】:Pass list of topics from application yml to KafkaListener将主题列表从应用程序 yml 传递给 KafkaListener
【发布时间】:2018-10-10 13:08:07
【问题描述】:

我有以下application.yml:

service:
  kafka:
    groupId: 345
    consumer: 
      topics: 
        -
          name: response    
    producer: 
      topics: 
        -
          name: request1 
          num-partitions: 5
          replication-factor: 1 
        -
          name: request2 
          num-partitions: 3
          replication-factor: 1  

如何使用 spel 访问主题名称列表以传递给 KafkaListener 注释?

@KafkaListener(topics = "#{'${service.kafka.consumer.topics.name}'}", containerFactory = "kafkaListenerContainerFactory")
public void receive(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {

【问题讨论】:

  • 查看我的答案here 在撰写本文时,我没有找到更优雅的方式
  • @Paizo 我不能使用逗号分隔的主题,因为我需要与每个主题相关的其他参数
  • 然后我会使用@ConfigurationProperties 来代替@KafkaListener 使用@Bean 来定义它。除非你用SPEL 表达式找到了一个漂亮的方法(我没有),否则我很乐意看到如何:)
  • 使用@ConfigurationProperties,您可以对主题集合使用投影;看我的回答。

标签: spring-boot yaml spring-kafka


【解决方案1】:

使用配置属性和集合投影...

@ConfigurationProperties("service.kafka.producer")
@Component
public class ConfigProps {

    List<Topic> topics = new ArrayList<>();

    public List<Topic> getTopics() {
        return this.topics;
    }

    public void setTopics(List<Topic> topics) {
        this.topics = topics;
    }

    @Override
    public String toString() {
        return "ConfigProps [topics=" + this.topics + "]";
    }

    public static class Topic {

        private String name;

        private int numPartitions;

        private short replicationFactor;

        public String getName() {
            return this.name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getNumPartitions() {
            return this.numPartitions;
        }

        public void setNumPartitions(int numPartitions) {
            this.numPartitions = numPartitions;
        }

        public short getReplicationFactor() {
            return this.replicationFactor;
        }

        public void setReplicationFactor(short replicationFactor) {
            this.replicationFactor = replicationFactor;
        }

        @Override
        public String toString() {
            return "Topic [name=" + this.name + ", numPartitions=" + this.numPartitions + ", replicationFactor="
                    + this.replicationFactor + "]";
        }

    }

}

@SpringBootApplication
public class So52741016Application {

    public static void main(String[] args) {
        SpringApplication.run(So52741016Application.class, args);
    }

    @KafkaListener(groupId = "${service.kafka.groupId}", topics = "#{configProps.topics.![name]}")
    public void listener(String in) {

    }

    @Bean
    public SmartLifecycle createTopics(KafkaAdmin admin, ConfigProps props) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MIN_VALUE;
            }

            @Override
            public void stop() {
            }

            @Override
            public void start() {
                try (AdminClient client = AdminClient.create(admin.getConfig())) {
                    CreateTopicsResult createTopics = client.createTopics(props.topics.stream()
                        .map(t -> new NewTopic(t.getName(), t.getNumPartitions(), t.getReplicationFactor()))
                        .collect(Collectors.toList()));
                    createTopics.all().get();
                }
                catch (Exception e) {
//                  e.printStackTrace();
                }
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };
    }

}

2018-10-10 11:20:25.813 INFO 14979 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[request1-4,request2-0,request1-0,request2- 1、request1-1、request2-2、request1-2、request1-3]

当然,这只是生产者话题,不过你可以这样处理。

【讨论】:

  • 这与 ConfigurationProperties 一起工作得很好:@KafkaListener(topics = "${spring.kafka.topic.creation}")
猜你喜欢
  • 2020-01-17
  • 2019-02-21
  • 2019-05-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-11
  • 2019-05-01
  • 2016-10-15
相关资源
最近更新 更多