【问题标题】:Is there a way to get offset for each message consumed in kafka streams?有没有办法为 kafka 流中消耗的每条消息获取偏移量?
【发布时间】:2017-07-06 09:35:12
【问题描述】:

为了避免读取已处理但在 KAFKA STREAMS 被杀死时错过提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某处并使用它以避免重新处理已处理的消息。

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    是的,这是可能的。请参阅http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information 上的常见问题解答条目。

    我将复制粘贴下面的关键信息:

    访问记录元数据,例如主题、分区和偏移信息?

    可通过Processor API 访问记录元数据。 它也可以通过DSL 间接访问,这要归功于它 Processor API integration.

    使用处理器 API,您可以通过 ProcessorContext。您可以将上下文的引用存储在 Processor#init() 期间处理器的实例字段,然后 查询Processor#process() 中的处理器上下文,例如 (Transformer 相同)。上下文会自动更新以匹配 当前正在处理的记录,这意味着方法 比如ProcessorContext#partition()总是返回当前 记录的元数据。调用处理器时有一些注意事项 punctuate() 中的上下文,有关详细信息,请参阅 Javadocs。

    如果您使用结合自定义Transformer的DSL,例如, 您可以将输入记录的值转换为还包括分区 和偏移元数据,以及后续的 DSL 操作,例如 map 或 然后filter 可以利用此信息。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-13
      • 1970-01-01
      相关资源
      最近更新 更多