【发布时间】:2021-08-04 13:52:56
【问题描述】:
有一个消费者使用earliest 偏移选项和批处理,效果非常好。
现在需要添加另一个消费者,它应该在单处理模式下工作并始终寻找 latest 偏移量 - 这个似乎与下面的一些配置冲突,因为它并没有真正处理消息。
整个应用程序的属性:
spring.kafka.bootstrap-servers=${KAFKA_ADDRESS:localhost:9092}
spring.kafka.consumer.group-id=group
spring.kafka.listener.type=batch
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=100000
工作消费者的bean:
@Bean("batchKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> batchKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(true);
return factory;
}
问题消费者的bean:
@Bean("singleKafkaListenerContainerFactoryManualCommit")
public ConcurrentKafkaListenerContainerFactory<?, ?> singleKafkaListenerContainerFactoryManualCommit(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setKafkaConsumerProperties(props);
factory.setBatchListener(false);
return factory;
}
两个消费者都启动并运行如下:
[ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group-2, groupId=group] Successfully joined group with generation 15
[ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation 15
他们确实针对相同的主题,但假设有不同的消费者群体在发挥作用,这在这种情况下应该不会影响任何事情。
也许存在消费者等待轮询整个批次或其他选项干扰的棘手问题?
UPD 基于 cmets,这可以通过简单的单独消费者组添加来完成。
这就是我想为新的消费者组创建修改属性的方式:
props.put("group.id", "group-single");
还有另一个选择:
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-single");
但仍然有日志表明没有新的消费者存在。
【问题讨论】:
-
为什么您的两个消费者都在同一个组中?当然,其中只有一个会从单个分区中获取消费者记录。
-
为什么我们假设它们在不同的组中?你的配置/代码没有显示这个
-
感谢输入,已更新
-
如果您仍然遇到问题,请在某个地方发布完整的 minimal reproducible example 以显示该行为。
标签: java apache-kafka spring-kafka