【问题标题】:Kafka setting consumer offset before committed offsetKafka在提交偏移之前设置消费者偏移
【发布时间】:2021-11-10 09:51:26
【问题描述】:

我正在运行一个程序,该程序从主题中的消息开始,使用它,处理它,提交下一个偏移量,并将新消息发布到同一主题,所有这些都以事务方式进行。我有以下(简化的)跟踪:

Fetch READ_COMMITTED at offset 20 for partition test-topic-0
processing message at offset 20
Committed offset 21 for partition test-topic-0
Sending PRODUCE
COMMITTING_TRANSACTION
Fetch READ_COMMITTED at offset 22 for partition test-topic-0
processing message at offset 22 <==== first time
...rebalance...
Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=21
Committed offset 23 for partition test-topic-0
Sending PRODUCE
COMMITTING_TRANSACTION
Fetch READ_COMMITTED at offset 24 for partition test-topic-0
stale fetch response for partition test-topic-0 since its offset 24 does not match the expected offset FetchPosition{offset=21
Fetch READ_COMMITTED at offset 21 for partition test-topic-0
processing message at offset 22 <==== second time

因此,我处理了消息“22”两次。是否预计 kafka 只是将消费者偏移量回退到提交的偏移量之前?日志的顺序看起来正确吗?如有必要,我可以使用完整日志更新问题,但我认为那里没有任何用处。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api message kafka-producer-api kafka-transactions-api


    【解决方案1】:

    看起来在生产者完成交易之前发生了重新平衡。查看您正在使用的代码/配置/Kafka 版本会很有帮助。

    事务性consume-process-produce 要求生产者做几件不同的事情。处理一批记录时:

    • producer.beginTransaction() - 此方法保证从调用它到事务中止/提交期间产生的所有内容都成为单个事务的一部分。

    • producer.send(producerRecord) - 对于您在批处理中处理的每条消息。

    • producer.sendOffsetsToTransaction( Map&lt;TopicPartition, OffsetAndMetadata&gt; offsetsToCommit, consumer.groupMetadata() ) - 一旦你完成了批处理,它将作为事务的一部分提交偏移量。请注意,以任何其他方式提交偏移量都不会提供事务保证。

    一旦生成了批次中的所有记录并且您将偏移量作为交易的一部分提交,您最终会提交交易并完成交易 - producer.commitTransaction()

    话虽如此,这应该可以解释为什么它拒绝了消息 24 并重新处理了消息 22。我相信消息 23 没有到达最后一个生产者步骤,但需要查看代码才能确定。来自 Kafka 权威指南:

    为了保证消息按顺序阅读, read_committed 模式不会返回产生的消息 在第一个仍处于打开状态的事务开始之后(称为 最后稳定偏移量,或 LSO)。这些消息将被保留,直到 该事务由生产者提交或中止,或直到 到达 transaction.timeout.ms(默认 15 分钟)并被 经纪人。

    两个主要错误(对于事务)是假设只有一次保证适用于除了生产到 Kafka 之外的操作,并且消费者总是读取整个事务并拥有有关事务边界的信息。

    【讨论】:

    • 谢谢,虽然您的回答没有完全解释为什么会发生这种情况,但我看到您假设我使用了 sendOffsetsToTransaction,但我没有:我正在像往常一样提交偏移量。
    • 日志显示您从未提交过偏移量 22。在其他不使用事务生产者的情况下,您不必重新处理偏移量 22,因为提交了偏移量 23。所以不,不希望 Kafka 将消费者偏移量倒回到提交的偏移量之前。但是,由于您使用的是事务方法,因此事务日志(与事务一起引入)记录了包含偏移量 22 的事务仍处于打开状态并且看不到中止标记。因此,kafka 将尝试完成事务或中止(取决于您的代码)
    • 运气好/能回答你的问题吗?
    • 我还在学习有关 kafka 的知识,我会更加仔细地重新阅读您的评论并尝试看看是否有意义
    猜你喜欢
    • 2017-08-22
    • 1970-01-01
    • 2022-11-13
    • 1970-01-01
    • 2020-06-11
    • 2019-04-04
    • 1970-01-01
    • 2022-11-11
    • 2020-03-05
    相关资源
    最近更新 更多