【问题标题】:How multiple consumer can listen to multiple topic in spring boot Kafka?spring boot Kafka中如何多个消费者可以收听多个主题?
【发布时间】:2019-01-18 13:17:09
【问题描述】:

当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。 在下面的示例中,我有 2 个消费者工厂,它们将接收 2 条不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试从 topic1 访问事件消息时,我无法访问,但我可以访问用户主题消息。

例如:

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {      
@Autowired
private Environment environment;

@Bean
public ConsumerFactory<String,User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(User.class));

}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(Event.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryEvent());
    return factory;
}
}

我的主要应用如下:

@KafkaListener(topics = "${event.topic}")
public void processEvent(Event event) {
..do something..
..post the message to User topic
}
@KafkaListener(topics = "${user.topic}")
public void processUser(User user) {
..do something..
}

我需要先监听事件主题并对消息进行一些按摩,然后将其发送到用户主题,我还有另一种方法可以监听用户主题并对该消息执行某些操作.. 我试图将不同的选项传递给@KafkaListener,例如

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")

但它不起作用..我不确定出了什么问题..任何建议都有帮助!

【问题讨论】:

  • 尝试为两个侦听器定义两个不同的 bean 名称并将这些名称添加到 '@kafkaListener' 注释中

标签: java spring-boot apache-kafka


【解决方案1】:

如果你没有在bean中指定名称,那么方法名称将是bean名称,在@KafkaListener中添加带有groupid的bean名称

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")

@Bean 中指定名称并将该名称添加到@kafkaListener

@Bean(name="kafkaListenerContainerFactoryEvent")
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}

【讨论】:

  • 完美,正是我想要的
【解决方案2】:

在任何文档中都不容易找到。

这里我以消费消息为例

topic=topic1 和 bootstrapserver=url1(JSON 序列化器和反序列化器)

topic=topic2 with bootstrapserver=url2(Avro 序列化器和反序列化器)

第一步:-

@Bean
public ConsumerFactory<String, String> consumerFactory1() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost1:9092"); //This is dummy
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConsumerFactory consumerFactory2() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost2:9092"); //This is dummy
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "https://abc.schemaregistery.example.com"); //Again this is dummy or can be avro serilaised class
    return new DefaultKafkaConsumerFactory<>(props);
}


  @Bean(name = "kafkaListenerContainerFactory1")
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory1() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory1());
    return factory;
}

 @Bean(name = "kafkaListenerContainerFactory2")
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory2());
    return factory;
}

第二步:-

  • @SpringBootApplication(排除 = KafkaAutoConfiguration.class) => 不要从 spring.kafka @ConfigurationProperties 定义的 yml 或属性文件中读取值

第三步:-

 @KafkaListener(
            topics = "topic1",
            containerFactory = "kafkaListenerContainerFactory1" ,
            groupId = "com.groupid1")
    public void receive(ConsumerRecord consumerRecord) throws InterruptedException {


        LOGGER.info("consuming from topic1 {}" , consumerRecord.value());
        Thread.sleep(1000000); //For testing

    }

 @KafkaListener(
            topics = "topic2",
            containerFactory = "kafkaListenerContainerFactory2" ,
            groupId = "com.groupid2")
    public void receive(ConsumerRecord consumerRecord) throws InterruptedException {


        LOGGER.info("consuming from topic2 {}" , consumerRecord.value());
        Thread.sleep(1000000); //For testing

    }

【讨论】:

    【解决方案3】:

    很明显,回答为时已晚。但它可能会帮助其他人。


    您不需要创建多个 ConsumerFactory Bean。您可以在不通知配置中的类(UserEvent)的情况下执行此操作,即 new JsonDeserializer&lt;&gt;(Event.class) 并添加受信任的包。

    @Bean
    public ConsumerFactory<String,User> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
    
        // TODO: Remove "*" and add specific package name
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // <<-- New config added
    
        return new DefaultKafkaConsumerFactory<>(config);
    
    }
    

    接收记录时:

    @KafkaListener(topics="${event.topic}")
    void receiveUserRecord(User record){ ... } # For User POJO
    
    @KafkaListener(topics="${event.topic}")
    void receiveEventRecord(Event record){ ... } # For Event POJO
    

    【讨论】:

      猜你喜欢
      • 2018-08-31
      • 2019-02-22
      • 2017-01-26
      • 1970-01-01
      • 1970-01-01
      • 2019-04-07
      • 1970-01-01
      • 2018-12-31
      • 2019-01-10
      相关资源
      最近更新 更多