【问题标题】:Kafka listener isn't listening to Kafka卡夫卡听众不听卡夫卡
【发布时间】:2020-07-15 00:03:32
【问题描述】:

我正在使用 Java 11 和 kafka-client 2.0.0。

我正在使用以下代码来生成消费者:

    public Consumer createConsumer(Properties properties,String regex) {
        log.info("Creating consumer and listener..");
        Consumer consumer = new KafkaConsumer<>(properties);
        ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                log.info("The following partitions were revoked from consumer : {}", Arrays.toString(partitions.toArray()));
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                log.info("The following partitions were assigned to consumer : {}", Arrays.toString(partitions.toArray()));
            }
        };
        consumer.subscribe(Pattern.compile(regex), listener);
        log.info("consumer subscribed");
        return consumer;
    }
}

我的投票循环在代码中的不同位置:

public <K, V> void startWorking(Consumer<K, V> consumer) {
        try {
            while (true) {
                ConsumerRecords<K, V> records = consumer.poll(600);
                if (records.count() > 0) {
                    log.info("Polled {} records", records.count());

                } else {
                    log.info("polled 0 records.. going to sleep..");
                    Thread.sleep(200);
                }
            }
        } catch (WakeupException | InterruptedException e) {
            log.error("Consumer is shutting down", e);
        } finally {
            consumer.close();
        }
    }

当我运行代码并使用此功能时,会创建消费者并且日志包含以下消息:

    Creating consumer and listener..
    consumer subscribed
polled 0 records.. going to sleep..
polled 0 records.. going to sleep..
polled 0 records.. going to sleep..

日志不包含有关分区分配/撤销的任何信息。

此外,我还可以在日志中看到消费者使用的属性(已设置 group.id):

2020-07-09 14:31:07.959 DEBUG 7342 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [server1:9092]
        check.crcs = true
        client.id =
        group.id=mygroupid
        key.deserializer=..
        value.deserializer=..

所以我尝试使用具有相同配置的 kafka-console-consumer,以便从 regex(mytopic.*) 应该捕获的主题之一中消费(在这种情况下,我使用了主题 mytopic-1):

/usr/bin/kafka-console-consumer.sh --bootstrap-server server1:9092 --topic mytopic-1 --property print.timestamp=true --consumer.config /data/scripts/kafka-consumer.properties  --from-begining

我的代码的其他部分有一个轮询循环,每 10m 超时一次。所以底线 - 问题是分区没有分配给 Java 使用者。监听器内部的打印永远不会发生,消费者也没有任何分区可以监听。

【问题讨论】:

  • 嗨 JeyJ。麻烦你用英文拼写检查器好吗? Stack Overflow 上的帖子旨在为未来的读者提供多年的利益,因此我们希望它们尽可能具有可读性。感谢您发布只是为了解决您的问题,但也许您可以将可读性视为在此处发布的廉价“价格”。
  • 您以前的很多帖子都已针对拼写进行了修改,并且这里期望当内容所有者看到编辑通知时,会记下应该进行的改进前进。
  • @halfer 当然,我可以做到。我不认为我的英语很差,也许是一封丢失的信或类似的东西......如果我找到了一个解决方案,即使没有人回答这个问题,我也总是试图留下一个答案所以你不能说我我不考虑未来的读者..
  • 是的,撇号尤其重要。它们仍然会标记为拼写错误(不幸的是,“wont”和“cant”除外,当省略撇号时,这些词具有完全意想不到的英语含义)。如果你能特别注意这些词,那就太好了。
  • @halfer 将在我的下一篇文章/asnwer 中进行:)

标签: java apache-kafka


【解决方案1】:

我的属性文件中似乎缺少 ssl 属性。如果您使用ssl,请不要忘记指定security.protocol=ssl。如果 Kafka 使用 ssl 并且您尝试在未配置 ssl 参数的情况下访问它,kafka-client API 似乎不会引发异常。

【讨论】:

    猜你喜欢
    • 2022-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-15
    • 2014-10-19
    • 1970-01-01
    相关资源
    最近更新 更多