【问题标题】:Spring boot consumer without using @Kafkalistener不使用 @Kafkalistener 的 Spring Boot 消费者
【发布时间】:2018-12-21 17:31:23
【问题描述】:

我正在尝试在不使用 @Kafkalistener 的情况下编写 kafka 消费者,以下是我用于配置侦听器的代码行:

@Configuration
    @EnableKafka
    public class KafkaConfig {

      @Value("${kafka.bootstrap-servers}")
      private String bootstrapServers;

      @Bean
      public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "org");
        // automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
      }

      @Bean
      public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }

      @Bean
      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
          ContainerProperties containerProperties=new ContainerProperties("in.t");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
      }

      @Bean
      public Consumer receiver() {
        return new Consumer();
      }
    }

这里如何配置主题和监听方法,我的消费者类可以有多个方法。

另外,想知道在将 @kafkalistener 与 kafka 流一起使用时是否存在任何潜在问题。

PS:我不想使用@KafkaListener。

【问题讨论】:

  • 为什么不使用 kafka 监听器? @用户
  • 因为它不适用于流

标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:
  1. @kafkalistener 不适用于 Kafka Streams。这是普通的Consumer。 Kafka Stream 可以通过StreamsBuilderFactoryBean 进行管理,特别是@Bean 用于KStream

  2. 如果您不想使用@kafkalistener,则需要按照手动创建KafkaListenerContainer 的方向进行操作。 KafkaListenerContainerFactory 可用于此目的,但仅限于 Spring Kafka 2.2 并且绝对不适用于 Spring Boot。

因此,除非手动创建ConcurrentMessageListenerContainer,否则您别无选择。并且已经在这里通过ContainerProperties 您可以注入目标messageListener。对于您自定义的Consumer POJO,您需要考虑将其包装到RecordMessagingMessageListenerAdapter 中。只有最后一个必须注入ConcurrentMessageListenerContainer

这就是@KafkaListener 的底层运作方式。

【讨论】:

    【解决方案2】:

    使用 Spring Boot 和 Spring Kafka,可以在没有 @KafkaListener 的情况下创建消费者并创建额外的显式 kafka bean(ConcurrentMessageListenerContainer 等......):

    1. application.yml - 创建连接配置。它们将被注入 KafkaProperties
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        producer:
          key-serializer: "org.apache.kafka.common.serialization.StringSerializer"
          value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
        listener:
          type: BATCH
        consumer:
          clientId: "applicationClientId"
          groupId: "applicationGroup"
          keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
          valueDeserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
          maxPollRecords: 50
          fetchMinSize: 50
          autoOffsetReset: earliest
          properties:
            spring:
              json:
                trusted:
                  packages: "*"
    
    1. 基于 KafkaProperties,spring-boot 将创建必要的 bean。因此,只需要为数据处理创建有用的主题端点。
    @Configuration
    @EnableKafka
    @RequiredArgsConstructor //Lombok annotation
    public class KafkaConsumerConfig implements KafkaListenerConfigurer {
    
        private final BeanFactory beanFactory;
        private final SomethingApplicationService somethingApplicationService
    
        @SneakyThrows //Lombok annotation
        @Override
        public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
            // The following may be in a loop with different services and topics
            val endpoint = new MethodKafkaListenerEndpoint<String, KafkaReceiptRequest>();
    
            endpoint.setBeanFactory(beanFactory);
            endpoint.setBean(somethingApplicationService);
            endpoint.setMethod(somethingApplicationService.getClass().getDeclaredMethod("processList", List.class));
            endpoint.setId("Unique id for this endpoint");
            endpoint.setTopics("topic1", ...);
            endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    
            registrar.registerEndpoint(endpoint);
        }
    
        @Bean
        public MessageHandlerMethodFactory messageHandlerMethodFactory() {
            return new DefaultMessageHandlerMethodFactory();
        }
    
    }
    
    1. 目标服务和数据对象
    @Service
    public class SomethingApplicationService {
    
        public void processList(List<Data> list) {
            //processing logic
        }
    
    }
    
    @Data //Lombok annotation
    public class Data {
        private String name;
        private String value;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-28
      • 1970-01-01
      • 2016-12-09
      • 2023-04-11
      • 2022-08-16
      • 1970-01-01
      • 2018-05-22
      相关资源
      最近更新 更多