【问题标题】:multiple consumers with latest and earliest offset options with spring-kafka使用 spring-kafka 具有最新和最早偏移选项的多个消费者
【发布时间】:2021-08-04 13:52:56
【问题描述】:

有一个消费者使用earliest 偏移选项和批处理,效果非常好。

现在需要添加另一个消费者,它应该在单处理模式下工作并始终寻找 latest 偏移量 - 这个似乎与下面的一些配置冲突,因为它并没有真正处理消息。

整个应用程序的属性:

spring.kafka.bootstrap-servers=${KAFKA_ADDRESS:localhost:9092}
spring.kafka.consumer.group-id=group
spring.kafka.listener.type=batch
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=100000

工作消费者的bean:

@Bean("batchKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> batchKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setBatchListener(true);
    return factory;
}

问题消费者的bean:

@Bean("singleKafkaListenerContainerFactoryManualCommit")
public ConcurrentKafkaListenerContainerFactory<?, ?> singleKafkaListenerContainerFactoryManualCommit(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setKafkaConsumerProperties(props);
    factory.setBatchListener(false);
    return factory;
}

两个消费者都启动并运行如下:

[ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group-2, groupId=group] Successfully joined group with generation 15
[ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation 15

他们确实针对相同的主题,但假设有不同的消费者群体在发挥作用,这在这种情况下应该不会影响任何事情。

也许存在消费者等待轮询整个批次或其他选项干扰的棘手问题?

UPD 基于 cmets,这可以通过简单的单独消费者组添加来完成。

这就是我想为新的消费者组创建修改属性的方式:

props.put("group.id", "group-single");

还有另一个选择:

props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-single");

但仍然有日志表明没有新的消费者存在。

【问题讨论】:

  • 为什么您的两个消费者都在同一个组中?当然,其中只有一个会从单个分区中获取消费者记录。
  • 为什么我们假设它们在不同的组中?你的配置/代码没有显示这个
  • 感谢输入,已更新
  • 如果您仍然遇到问题,请在某个地方发布完整的 minimal reproducible example 以显示该行为。

标签: java apache-kafka spring-kafka


【解决方案1】:

事实证明这是通过添加这个而不是道具图来解决的:

factory.getContainerProperties().setClientId("consumer-group-other-1");
factory.getContainerProperties().setGroupId("consumer-group-other");

【讨论】:

    猜你喜欢
    • 2021-08-09
    • 1970-01-01
    • 2019-06-10
    • 1970-01-01
    • 2018-06-27
    • 2023-03-31
    • 2020-07-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多