【发布时间】:2020-05-21 19:59:41
【问题描述】:
我不想使用@KafkaListener 或@StreamListener,但我想手动轮询kafka。我正在使用 spring-cloud-starter-stream-kafka 库,并且我有以下 Kafka Producer
@Autowired
private KafkaTemplate<byte[], byte[]> template;
public void sendMessages() {
IntStream.range(2)
.forEach(val -> {
template.send("kafka-topic", "hello".getBytes());
});
}
我想使用 spring-kafka 手动轮询相同的 kafka 主题。我尝试了以下消费者
@Autowired
private ConsumerFactory consumerFactory;
public void processKafkaRecords() throws InterruptedException {
Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
consumer.subscribe(Arrays.asList("kafka-topic"));
ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
poll.forEach(record -> {
log.info("record {}", record);
});
}
application.properties
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true
spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false
但是,消费者永远不会得到生产者发送的任何记录。任何想法如何手动轮询 kafka 主题?
【问题讨论】:
-
你需要展示你的消费者工厂配置和application.yml/properties。
-
如何触发消费者?为什么你使用 Steam,然后使用普通产品发布并由消费者投票?
-
@GaryRussell 我没有消费者工厂配置,我假设它会使用默认的,我只是将它绑定到 kafka 主题,这还不够吗?有没有一个简单的例子,我可以看到一个简单的设置?谢谢
-
@daniu 消费者是spring bean,我手动触发。我使用 spring stream 因为我一直在使用 streamListener 但现在我想手动开始轮询
-
嗯,你从某个地方得到了一个消费者工厂;否则
@Autowired将无法工作。如果您的 Kafka 在本地主机上;你可能会得到 Boot 的默认值。您需要添加spring.kafka.consumer.auto-offset-reset=earliest- 有关更多信息,请参阅 Spring Boot Kafka 文档章节。您还需要等待通过RebalanceListener进行订阅(或使用consumer.assign()而不是subscribe())。
标签: java spring apache-kafka spring-kafka spring-cloud-stream