【问题标题】:Is there a way to do a Kafka poll using spring-kafka - which retrives the list of new messages?有没有办法使用 spring-kafka 进行 Kafka 民意调查 - 检索新消息列表?
【发布时间】:2018-10-04 13:05:26
【问题描述】:

我想知道spring-kafka 中是否有一些选项可以将所有新消息抓取到一个列表中。

例如,如果我正在侦听Message 对象,我想获得自上次轮询以来的List<Message>。类似的东西:

@KafkaListener(poll-interval=1000, topics = "${kafka.topic}", containerFactory = "objectListListenerContainerFactory", )
public void messageListener(List<Message> messages) {
    log.info("Count of new messages since last poll : {}", messages.size());
}

我已经通过Spring Kafka: Poll for new messages instead of being notified using `onMessage`。但对我来说不是很有用。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    使用普通的 Spring Kafka,您可以使用 ConsumerFactory 创建一个 KafkaConsumer,然后您可以在方便时自行从那里轮询记录。

    还要注意KafkaMessageListenerContainer可以在任意时刻暂停以停止轮询,但仍然连接到消费者组。

    在 Spring Integration Kafka 扩展中,有一个新的 KafkaMessageSource 用于此类任务:

    /**
     * Polled message source for kafka. Only one thread can poll for data (or
     * acknowledge a message) at a time.
     * <p>
     * NOTE: If the application acknowledges messages out of order, the acks
     * will be deferred until all messages prior to the offset are ack'd.
     * If multiple records are retrieved and an earlier offset is requeued, records
     * from the subsequent offsets will be redelivered - even if they were
     * processed successfully. Applications should therefore implement
     * idempotency.
     *
     * @param <K> the key type.
     * @param <V> the value type.
     *
     * @author Gary Russell
     * @author Mark Norkin
     * @author Artem Bilan
     *
     * @since 3.0.1
     *
     */
    public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
    implements DisposableBean, Lifecycle {
    

    我认为我们需要考虑在 Spring for Apache Kafka 参考手册中记录这一点:https://docs.spring.io/spring-kafka/docs/current/reference/html/_spring_integration.html#si-kafka。因此,请随时在 spring-kafka 项目中提出问题来解决这一差距。

    【讨论】:

    • 谢谢阿特姆。所以目前,我认为最好的方法是使用 KafkaConsumer。我在 github 中创建了一个问题。将来这可能是一个不错的功能。
    • 好的。这可能是KafkaTrmplate 中的poll 变体,但会看到...恕我直言,Spring Integration 已经为我们解决了问题...
    • 我还要检查 Spring Integration。 :)
    猜你喜欢
    • 2019-08-16
    • 2020-06-04
    • 1970-01-01
    • 1970-01-01
    • 2017-09-13
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 2021-09-23
    相关资源
    最近更新 更多