【问题标题】:Auto delete kafka consumer group after disconnect断开连接后自动删除kafka消费者组
【发布时间】:2020-03-26 02:52:14
【问题描述】:

在我的 Java 应用程序中,每隔几秒钟,我会为消费者分配一个特定的 TopicPartition,并尝试从特定主题 + 分区中读取特定消息。阅读消息后(使用 poll() )我立即断开消费者的连接。

因为上面的场景可以在多线程环境下运行消费者组名有前缀+随机哈希例如my_consumer_group_EWQSV(因为 kafka 不会将 same 特定分区分配给同一组中的两个消费者)。

问题是,我无法告诉 kafka 在断开连接后删除这些消费者(因为这些消费者只是暂时的),有什么办法吗? (不是手动,我的意思是通过使用配置或其他方式,我找不到像“auto-delete-after-consumer-disconnect”这样的配置)

谢谢:)

【问题讨论】:

  • 为什么会给您带来麻烦?
  • 好吧,1天后我将拥有数百个永远不会再次使用的临时消费者组
  • 创建 cron 作业清理超过 1 天的所有内容是否有问题?在操作系统级别。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

在java中,为了在不创建消费者组的情况下手动消费来自kafka的特定消息:

以下内容就足够了:

kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        TopicPartition tp = new TopicPartition(topic, partition);
        try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
            consumer.assign(Collections.singletonList(tp));
            consumer.seek(tp, offset);
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
        } 

重要的部分是:

  • 将 enable.auto.commit 属性设置为 false(如上所示)。
  • NOT 设置 group.id (ConsumerConfig.GROUP_ID_CONFIG) 属性,因为不需要 id。
  • 使用自动分配分区的subscribe方法,而不是使用assign和seek方法手动读取消息,如上所示。

【讨论】:

    猜你喜欢
    • 2020-04-02
    • 1970-01-01
    • 2020-10-16
    • 2021-11-30
    • 2019-09-19
    • 2021-11-30
    • 1970-01-01
    • 1970-01-01
    • 2021-01-16
    相关资源
    最近更新 更多