【问题标题】:Consume multiple topics in one listener in spring boot kafka在 spring boot kafka 的一个监听器中消费多个主题
【发布时间】:2019-02-22 10:36:34
【问题描述】:

谁能给我一个spring boot kafka的小例子,我们可以在一个监听器类中消费多个主题。

【问题讨论】:

    标签: spring-boot apache-kafka spring-kafka


    【解决方案1】:

    application.yml

    my:
        kafka:
            conf:
                groupId: myId
                topics: topic1,topicN
    

    你的听众:

    @KafkaListener(groupId = "${my.kafka.conf.groupId}", topics = "#{'${my.kafka.conf.topics}'.split(',')}")
    public void storeTopicsDataToMongo(
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Payload(required = false) String record)
    {
        log.trace(format("Received topic[%s] key[%s] payload[%s]", topic, key, record));
        //your code
    }
    

    或者您可以探索 @ConfigurationProperties 并自己创建 bean,例如:

    @Component
    @ConfigurationProperties(prefix = "my.kafka.conf")
    @Data //=> lombok
    public class ConsumerConfigurationProperties {
    
        private String groupId;
        private List<String> topics;
    }
    

    【讨论】:

    • 但是现在我得到了错误:Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.header.Headers
    • 这是您如何配置/构建/运行应用程序的问题,请确保在依赖项中使用相同版本的 kafka 库。另外,如果此答案对您有帮助,请投票或接受;)
    • @Paizo : 如果我有一个主题和一组消费者在听这个主题(比如 10 个),是否意味着将并行处理 10 条到达该主题的消息?跨度>
    • Kafka 并行性适用于具有不同组 id 的消费者,但在 spring kafka 中将有多个线程处理@KafkaListener 的执行。因此,如果您以编程方式为每个分区使用不同的组 ID 定义侦听器,可以提高性能
    【解决方案2】:

    对于消费者组的消费者,您可以使用以下内容:

    @KafkaListener(topics = "topic1,") 公共无效听(@Payload KafkaBinding 记录,MessageHeaders 标头)抛出 ExecutionException,InterruptedException { ………… ………….. }

    对于充当受让人的消费者,您可以使用以下内容:

    @KafkaListener(id = “foo”,topicPartitions = { @TopicPartition(topic = “myTopic”,partitions = { “1” })}) 公共无效听(@Payload KafkaBinding 记录,MessageHeaders 标头)抛出 ExecutionException,InterruptedException { ………… ………….. }

    【讨论】:

      猜你喜欢
      • 2018-08-31
      • 1970-01-01
      • 2019-01-18
      • 1970-01-01
      • 2019-09-26
      • 2018-11-08
      • 2019-08-25
      • 2017-11-20
      • 2020-11-06
      相关资源
      最近更新 更多