【问题标题】:Spring-kafka offset commiting waysSpring-kafka 偏移提交方式
【发布时间】:2018-03-27 19:32:38
【问题描述】:

如果我不想使用自动提交模式 - sping 提供了另一种方法来做到这一点。

spring-kafkfa/#committing-offsets 向我们提供以下有关提交偏移的信息:

RECORD - 处理后监听返回时提交偏移量 记录。
BATCH - 当所有记录返回时提交偏移量 poll() 已被处理。
TIME - 当所有 poll() 返回的记录已被处理,只要 自上次提交以来的确认时间已超过。
COUNT - 提交 当 poll() 返回的所有记录都被处理完时的偏移量 只要自上次提交以来已收到 ackCount 记录。
COUNT_TIME - 类似于 TIME 和 COUNT 但如果执行提交 任一条件为真。
MANUAL - 消息侦听器负责 确认()确认;之后,相同的语义 应用 BATCH。
MANUAL_IMMEDIATE - 立即提交偏移量 当 Acknowledgement.acknowledge() 方法被 听众。

我有几个问题:

TIME 据我了解,spring 框架中的某处存在循环,该循环执行循环

while(true){
   data = consumer.poll();
   data.foreach(record->listener.listen(record))
}

民意调查多久进行一次?

时间是提交偏移的唯一标准吗? 假设 poll 返回了 100 条记录,而 ackTime 过期时 - 只处理了 60 条记录?

我没有发现MANUAL_IMMEDIATEMANUAL 之间的区别

请为我澄清这些问题。

附言

据我了解,Garry Russel 对 foreach 的回答如下所示:

while(true){
   data = consumer.poll();
   data.foreach(record->new Thread(()->listener.listen(record)).start());
}

【问题讨论】:

    标签: java spring kafka-consumer-api spring-kafka


    【解决方案1】:

    取决于版本;最近的 1.3 版本有一个更简单的线程模型,由KIP-62 推动。

    使用该版本,在调用者线程上调用侦听器;在所有当前记录被消耗之前,下一次轮询不会发生。除了RECORD(和MANUAL*)之外,提交的决定是在所有记录都发送到监听器之后确定的。

    MANUAL_IMMEDIATE 就是这个意思;当用户确认时立即提交偏移量;使用MANUAL,在所有记录都发送到侦听器后提交手动偏移量。

    较早的版本有点复杂;可能会获取一个或两个额外的批次,并且在每次轮询之前执行确认,因此可能会在第一批次中的所有记录都发送到侦听器之前提交偏移量。

    编辑

    在下面回答你的 cmets...

    是的;线程在 1.3 中发生了变化。在此之前,我们必须不断轮询消费者以避免代理重新平衡分区。 ConsumerRecords 通过深度为 1 的队列从侦听器线程移交。轮询器继续轮询,直到队列中不能再容纳 ConsumerRecords;在那个时候,它pauses 消费者(这样后续的轮询不会返回任何记录),但我们仍然必须继续调用poll 以避免重新平衡。当侦听器赶上来时,消费者是resumed 并且消息再次开始流动。

    因此,最坏的情况是容器包含 3 组记录——当前正在由侦听器处理的一组记录、队列中的一组记录和我们无法放入队列的一组记录。任何未完成的偏移提交(手动或其他)都会在每次轮询之前执行。

    轮询线程会等待处理器线程吗?

    没有;我们不能这样做,因为这会导致重新平衡 - 就像我们在消费者线程上调用监听器一样。

    KIP-62 实际上在 0.10.1.0 客户端中已修复,但我们直到 1.3 才更改线程;由于 KIP-62,这是一个重大的简化,我建议使用该版本。

    【讨论】:

    • @Gart,你能澄清一下“调用者线程”吗?这是什么?
    • MANUAL_IMMEDIATE 和 MANUAL:对于这两种情况,客户端代码都必须调用 Acknowledgment.acknowledge()。对于 MANUAL_IMMEDIATE - kafka 服务器将立即收到通知。对于 MANUAL - spring 将收集确认,而先前轮询消耗的所有记录都不会路由到侦听器。对吗?
    • 使用 poll(和commits),一个调用监听器;记录被移交给第二个线程。这是为了避免由于缓慢的侦听器而导致不希望的重新平衡;如果监听器很慢,消费者线程最终pause 成为消费者,直到监听器赶上。是的,正确,使用手动,我们收集所有手动确认并在下一次投票之前提交它们。 >consumed by previous poll will not routed to listener - 不知道你的意思;所有记录都发送给监听器。
    • 让我从另一边问关于手册的问题。我问是因为我不明白你的短语在所有记录都发送到侦听器后提交手动偏移量。 好的,有 2 个线程:poll thread处理器线程。轮询线程调用轮询并将数据传递给另一个线程?下一步是什么?轮询线程会等待处理器线程吗? processorThread.join() 然后才提交?下一次投票可以在之前的投票记录提交之前进行吗?
    • slow listener - 你能解释一下这个定义以及为什么 2 个线程有助于解决这个问题吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-28
    • 2018-11-23
    • 1970-01-01
    • 2020-10-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多