【问题标题】:Spring Cloud Stream Kafka Commit Failed since the group is rebalanced由于组重新平衡,Spring Cloud Stream Kafka 提交失败
【发布时间】:2025-12-02 07:20:04
【问题描述】:

我有一些耗时的 Spring Cloud Stream 应用程序的 CommitFailedException。我知道要解决此问题,我需要设置 max.poll.recordsmax.poll.interval.ms 以符合我对处理批处理所需时间的期望。但是,我不太确定如何在 Spring Cloud Stream 中为消费者设置它。

例外:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:691) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416) at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1554) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1418) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:739) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.lang.Thread.run(Thread.java:748)

此外,我怎样才能确保这种情况根本不会发生?或者,在这种异常的情况下,我如何注入某种回滚?原因是我正在做一些其他的外部工作,一旦完成,我就会相应地发布输出消息。因此,如果在外部系统上完成工作后由于任何问题无法发布消息,我必须将其还原(通过 Kafka 发布和其他外部系统进行某种原子事务)。

【问题讨论】:

    标签: apache-kafka spring-kafka spring-cloud-stream


    【解决方案1】:

    您可以在活页夹级别设置任意 Kafka 属性 documentation here

    spring.cloud.stream.kafka.binder.consumerProperties

    任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性,这里也允许未知的消费者属性。此处的属性取代了在引导和上面的配置属性中设置的任何属性。

    默认值:空地图。

    例如spring.cloud.stream.kafka.binder.consumerProperties.max.poll.records=10

    或者在绑定级别documentation here

    spring.cloud.stream.kafka.bindings..consumer.configuration

    使用包含通用 Kafka 消费者属性的键/值对映射。除了拥有 Kafka 消费者属性外,这里还可以传递其他配置属性。例如应用程序需要的一些属性,例如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。

    默认值:空地图。

    例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=10

    您可以通过将OffsetCommitCallback 添加到侦听器容器的ContainerProperties 并将syncCommits 设置为false 来获得提交失败的通知。要自定义容器及其属性,请将 ListenerContainerCustomizer bean 添加到应用程序。

    编辑

    异步提交回调...

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So57970152Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57970152Application.class, args);
        }
    
        @Bean
        public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
            return (container, dest, group) -> {
                container.getContainerProperties().setAckMode(AckMode.RECORD);
                container.getContainerProperties().setSyncCommits(false);
                container.getContainerProperties().setCommitCallback((map, ex) -> {
                    if (ex == null) {
                        System.out.println("Successful commit for " + map);
                    }
                    else {
                        System.out.println("Commit failed for " + map + ": " + ex.getMessage());
                    }
                });
                container.getContainerProperties().setClientId("so57970152");
            };
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> {
                template.send("input", "foo".getBytes());
            };
        }
    
    }
    

    手动提交(同步)...

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So57970152Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57970152Application.class, args);
        }
    
        @Bean
        public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
            return (container, dest, group) -> {
                container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
                container.getContainerProperties().setClientId("so57970152");
            };
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(String in, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
            System.out.println(in);
            try {
                ack.acknowledge(); // MUST USE MANUAL_IMMEDIATE for this to work.
                System.out.println("Commit successful");
            }
            catch (Exception e) {
                System.out.println("Commit failed " + e.getMessage());
            }
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> {
                template.send("input", "foo".getBytes());
            };
        }
    
    }
    

    【讨论】:

    • 谢谢。您能否详细说明您提到的让 bean 收到提交失败通知的方法?
    • 您也可以使用手动提交来检测异常。我在回答中添加了这两种技术的示例。
    【解决方案2】:

    将心跳间隔设置为小于会话超时的 1/3。如果代理无法确定您的消费者是否还活着,它将在剩余消费者之间启动分区重新平衡。所以你有一个心跳线程来通知代理消费者还活着,以防应用程序需要更长的时间来处理。在您的消费者配置中更改这些:

    heartbeat.interval.ms
    
    session.timeout.ms
    

    如果不起作用,请尝试增加会话超时。您必须摆弄这些值。

    【讨论】: