【发布时间】:2020-06-27 05:21:23
【问题描述】:
使用具有以下配置的 Kafka/Java:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
我有一个简单的轮询循环,例如:
consumer.poll(Duration.ofMillis(200));
我注意到一些奇怪的行为。持续时间为 0,它不返回任何结果。在本地,持续时间为 200 毫秒,我得到了一些结果,但在另一个生产环境中它永远不会返回结果,它至少需要 1 秒。
在我的理解中,poll 方法会等到至少找到一个结果。持续时间为0,至少应该返回已经到达的结果,而不应该总是不返回任何结果。
解释是什么?
【问题讨论】:
标签: java apache-kafka kafka-consumer-api