【问题标题】:Passing Acknowledgment to a spring KafkaListener consumer method将确认传递给 Spring KafkaListener 消费者方法
【发布时间】:2021-11-25 01:59:51
【问题描述】:

我正在尝试关闭 kafka 中的自动提交,而是手动进行。为此,在我的application.properties 中我设置了spring.kafka.properties.enable.auto.commit=false

我目前还有一个带有以下标头的方法:

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers)

我的理解是,为了手动提交,我需要访问 Acknowledgement 对象,该对象将作为参数传递给我的 receive() 方法。我的问题:如果我将标题更改为

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers,
                    Acknowledgment acknowledgment)

Acknowledgment 会自动传入,还是我需要进行其他更改?

【问题讨论】:

    标签: java spring-kafka


    【解决方案1】:

    是的,这样Acknowledgment 实例将被传递到您的侦听器方法中。成功处理收到的消息后,您应该调用acknowledgement.acknowledge();(仅当您想手动确认时才需要)

    我也会切换到MANUAL ackmode 并关闭自动提交(你已经做过的),例如通过提供自定义 Spring Boot 配置类 - 也可以通过 application.properties 进行配置:

    @Configuration
    class KafkaConfiguration {
    
            @Bean
            ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    
                final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
                consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
    
                ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
                factory.getContainerProperties().setAckMode(MANUAL);
    
                return factory;
            }
        }
    }
    

    如果您不想手动确认,那么不同的确认模式可能更方便且更适合:

    https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

    AckMode.RECORD 很舒服,因为如果您的侦听器的方法实现成功完成(不抛出异常),传入您的侦听器方法的 Kafka 记录将自动得到确认。

    【讨论】:

    • 我遇到了 AckMode.RECORD 和 isAckOnError false 的问题,如果关闭应用程序,偏移量不会重新传递。有什么问题?
    【解决方案2】:

    简短回答,您不需要传递Acknowledgment,它是接收消息的一部分。

    根据documentation

    当使用手动AckMode时,您也可以向监听器提供Acknowledgment。以下示例还展示了如何使用不同的容器工厂。

    @KafkaListener(id = "cat", topics = "myTopic",
              containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listen(String data, Acknowledgment ack) {
        ...
        ack.acknowledge();
    }
    

    来自@KafkaListener 文档:

    带注释的方法可以具有类似于 MessageMapping 提供的灵活签名,即

    • ConsumerRecord 访问原始 Kafka 消息
    • 手动确认的确认
    • @Payload-annotated 方法参数,包括对验证的支持
    • @Header-annotated 方法参数用于提取由 KafkaHeaders 定义的特定标头值
    • @Headers-annotated 参数也必须分配给 Map 以访问所有标题。
    • MessageHeaders 参数用于访问所有标头。
    • MessageHeaderAccessor 方便访问所有方法参数。

    【讨论】:

      【解决方案3】:

      如果我们想通过方法注入传入 Spring 管理的 bean 怎么办?我试图通过方法注入将服务类传递给 kafka 侦听器 -

      private fun defaultListener(payload: ByteArray, @Headers messageHeaders: MessageHeaders, ack: Acknowledgment, callbackService: CallbackService) {
       // Do something
      }
      

      我得到以下异常 -

      org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.test.kafka-demo.service.CallbackService]
      

      如果我将依赖服务类设为 Autowired,则可以正常工作。

      【讨论】:

        猜你喜欢
        • 2019-02-21
        • 2018-12-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-10-28
        • 1970-01-01
        • 2019-12-11
        相关资源
        最近更新 更多