【问题标题】:Kafka Consumer committing manually based on a condition.Kafka 消费者根据条件手动提交。
【发布时间】:2017-07-24 04:03:01
【问题描述】:

@kafkaListener 消费者在满足特定条件后提交。假设一个主题从生产者那里获取以下数据 偏移量 [0] 处的“消息 0” 偏移量[1]处的“消息 1”

它们在消费者处被接收并在 acknowledgement.acknowledge() 的帮助下提交

那么下面的消息就进入主题了

偏移[2]处的“消息2” 偏移量[3]处的“消息 3”

正在运行的消费者收到上述数据。这里条件失败并且上述偏移量未提交。

即使主题出现新数据,“消息 2”和“消息 3”也应该由同一消费者组中的任何消费者选择,因为它们没有提交。但这并没有发生,消费者收到了一条新消息。

当我重新启动消费者时,我会返回 Message2 和 Message3。这应该在消费者运行时发生。

代码如下——: KafkaConsumerConfig 文件

enter code here

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1");
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

Listner Class
public class Listener {
    public CountDownLatch countDownLatch0 = new CountDownLatch(3);
    private Logger LOGGER = LoggerFactory.getLogger(Listener.class);
    static int count0 =0;


    @KafkaListener(topics = "abcdefghi", group = "group1", containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition0(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                                 @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) throws InterruptedException {
        count0 = count0 + 1;
        LOGGER.info("start consumer 0");

        LOGGER.info("received message via consumer 0='{}' with partition-offset='{}'", data, partitions + "-" + offsets);
        if (count0%2 ==0)
            acknowledgment.acknowledge();
        LOGGER.info("end of consumer 0");


    }

我怎样才能达到我想要的结果?

【问题讨论】:

  • 你等了多久?不良消费者可能需要一些时间才能退出组并进行重新平衡

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

没错。 offset 是一个很容易在消费者实例的内存中跟踪的数字。我们需要为相同分区的组中新到达的消费者提交偏移量。这就是当您重新启动应用程序或为组发生重新平衡时,它会按预期工作的原因。

要使其按您希望的方式工作,您应该考虑在您的侦听器中实现 ConsumerSeekAware 并调用 ConsumerSeekCallback.seek() 以获得您希望从下一个轮询周期开始消耗的偏移量。

http://docs.spring.io/spring-kafka/docs/2.0.0.M2/reference/html/_reference.html#seek:

public class Listener implements ConsumerSeekAware {

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallBack.set(callback);
    }

    @KafkaListener()
    public void listen(...) {
        this.seekCallBack.get().seek(topic, partition, 0);
    }

}

【讨论】:

  • 如何使用 @kafkaListener 实现 ConsumerSeekAware?
  • 在我的回答中查看示例。
  • 这行得通,我还有一个疑问,对于我的用例,我想暂停 Kafka Consumer 并在满足特定条件时从特定偏移量恢复它。如何在 kafkaListener 中实现 consumer.pause()。我是否还可以对 kafka 侦听器进行编程,使其仅在满足特定条件时才汇集数据,而不是在条件失败时立即汇集数据?谢谢
  • 要暂停消费者,您应该在您的@KafkaListenerseek() 偏移上使用Consumer 参数,然后再暂停。
  • 您能否举例说明如何在@KafkaListener 上添加消费者参数。我试过@KafkaListener(topics = "abcdefghi", group = "group1", containerFactory = "kafkaListenerContainerFactory") public void listenPartition0(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,@Header(KafkaHeaders.OFFSET) List offsets, Acknowledgment acknowledgment, Consumer consumer) 这会抛出一个错误,即 Failed to convert message payload '[llll]' to 'org.apache.kafka.clients.consumer.Consumer 感谢您的帮助
猜你喜欢
  • 1970-01-01
  • 2017-10-07
  • 1970-01-01
  • 2018-11-23
  • 1970-01-01
  • 2023-01-31
  • 2017-03-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多