【问题标题】:Kafka Streams does not increment offset by 1 when producing to topicKafka Streams 在生产主题时不会将偏移量增加 1
【发布时间】:2019-07-05 07:11:20
【问题描述】:

我已经实现了一个简单的 Kafka 死信记录处理器。

在使用控制台生产者生成的记录时,它可以完美运行。

但是我发现我们的 Kafka Streams 应用程序并不能保证为接收器主题生成记录,每生成一条记录,偏移量就会增加 1。

死信处理器背景:

我有一个场景,即在发布处理它所需的所有数据之前可能会收到记录。 当流应用程序不匹配记录以进行处理时,它们将移动到死信主题,而不是继续向下流。当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理。

死信处理器:

  • 在运行应用程序开始时记录每个分区的结束偏移量
  • 结束偏移量标记停止处理给定死信主题记录的点,以避免在重新处理的记录返回死信主题时出现无限循环。
  • 应用程序从上次通过消费者组运行产生的最后偏移量恢复。
  • 应用程序正在使用事务和KafkaProducer#sendOffsetsToTransaction 提交最后生成的偏移量。

为了跟踪我的范围内的所有记录何时为主题的分区处理,我的服务将其最后生成的从生产者的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,消费者通过KafkaConsumer#pause 暂停该分区,当所有分区都暂停时(意味着它们达到保存的结束偏移量)然后调用它退出。

Kafka Consumer API 州:

偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。

Kafka Producer API 引用的下一个偏移量也始终为 +1。

向消费者组协调器发送指定偏移量列表,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。

但是您可以在我的调试器中清楚地看到,单个分区消耗的记录不是一次递增 1...

我认为这可能是 Kafka 配置问题,例如 max.message.bytes,但没有一个真正有意义。 然后我想也许是因为加入,但没有看到任何方式会改变制片人的运作方式。

不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...

无论生产方法如何,偏移量是否应该始终递增 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?

是不是我完全错过了什么?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api apache-kafka-streams kafka-producer-api


    【解决方案1】:

    消息偏移量加一并不是官方的 API 约定,即使 JavaDocs 表明了这一点(似乎应该更新 JavaDocs)。

    • 如果您不使用事务,您将获得至少一次语义或无法保证(有些人称之为最多一次语义)。对于至少一次,记录可能被写入两次,因此,两个连续消息的偏移量并没有真正增加一,因为重复写入“消耗”了两个偏移量。

    • 如果您使用事务,事务的每次提交(或中止)都会将提交(或中止)标记写入主题——这些事务标记也“消耗”一个偏移量(这是您观察到的)。

    因此,一般而言,您不应依赖连续的偏移量。您得到的唯一保证是,每个偏移量在分区内都是唯一的。

    【讨论】:

    • 在示例中,初始化时结束偏移量为 71495,主题中的最后一条记录的偏移量为 41491。似乎我应该知道我是否已经消费了某个范围内的每条记录(来自消费者的最后偏移量通过当前在分区中的最后一条记录进行分组)。但是结束偏移量实际上是下一条记录的偏移量,而当前记录在使用时不知道下一个偏移量是什么,也不知道偏移量将增加多少。那么唯一知道是否所有记录都已被消费的方法是等待下一条记录产生吗?除非我能知道我是和一个分区的结束......
    • stackoverflow.com/questions/54544074/… 我注意到了同样的行为。但是,就我而言,我使用与 kafka 的交易来保证一次性交付。您是否有机会使用交易?
    • @JR ibkr 是的,我也在使用事务并通过KafkaProducer#sendOffsetsToTransaction 调用提交消费者组偏移量。
    • 无论如何;生产者/消费者不应该担心偏移量。他们的工作是简单地从队列中推/拉。如果您想添加一些元数据,请使用标题cwiki.apache.org/confluence/display/KAFKA/…
    • @MatthiasJ.Sax 你能把For at-least...“consumers”这一行编辑成“consumes”吗?我必须阅读多行才能理解该语句。
    【解决方案2】:

    我知道知道消息的偏移量会很有用。但是,Kafka 只会保证 message-X 的偏移量大于最后一条消息(X-1)的偏移量。顺便说一句,理想的解决方案不应基于偏移计算。

    在后台,kafka 生产者可能会尝试重新发送消息。此外,如果经纪人倒闭,则可能会发生重新平衡。恰好一次语义可能会附加一条附加消息。因此,如果发生上述任何事件,您的消息偏移量可能会发生变化。

    Kafka 可能会出于内部目的向主题添加其他消息。但是 Kafka 的消费者 API 可能会丢弃这些内部消息。因此,您只能看到您的消息,并且消息的偏移量不一定会增加 1。

    【讨论】:

    • 我们有很多用例,我们关心从一系列偏移中获取记录。此示例因为目标主题位于上游,并且可能会导致消息在没有它的情况下多次循环通过此过程。我们的其他用例,例如当消息通过 RESTful API 发送时可能会在不同的 Kafka 集群之间丢失,并且我们希望“重放”一系列未收到的记录。这种情况不同的原因是最后一个偏移量是由结束偏移量推断的,这实际上是产生的下一条消息的偏移量,而不是一个范围内的最后一条
    • 嗯,底线是不要依赖偏移量,它可能不是你所期望的。使用元数据或键来标记和使用它们为您带来优势。如果您使用的是 Streams API,那么您也可以使用 kafka 提供的本地存储。 kafka.apache.org/20/documentation/streams/developer-guide/…
    • 您应该使用键来识别您要发送的每条消息,即如果您想从第 100 条消息中读取所有消息。从第 100 个偏移量读取直到 message.key
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-16
    • 1970-01-01
    • 2021-04-10
    • 1970-01-01
    • 2019-06-14
    • 2019-01-06
    • 1970-01-01
    相关资源
    最近更新 更多