【发布时间】:2018-09-11 21:09:17
【问题描述】:
我正在使用 spring-kafka 来轮询消息,当我为消费者使用注释并将偏移量设置为 0 时,它将最早看到所有消息。但是当我尝试使用注入的 ConsumerFactory 自己创建消费者时,poll 只会返回几条消息或根本没有消息。为了能够提取消息,我还需要其他一些配置吗?轮询超时已设置为 10 秒。
@Component
public class GenericConsumer {
private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);
@Autowired
ConsumerFactory<String, Record> consumerFactory;
public ConsumerRecords<String, Record> poll(String topic, String group){
logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
Consumer<String, Record> consumer = consumerFactory.createConsumer(group, "");
consumer.subscribe(Arrays.asList(topic));
// need to make a dummy poll before we can seek
consumer.poll(1000);
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, Record> records;
records = consumer.poll(10000);
logger.info("------------ Total " + records.count() + " records polled");
consumer.close();
return records;
}
}
【问题讨论】:
-
max.poll.records 设置为什么?您可能需要循环轮询。
-
我尝试了 max.poll.records 500 和 5000,我确实在循环中尝试过。但没什么区别。