【发布时间】:2017-07-06 09:35:12
【问题描述】:
为了避免读取已处理但在 KAFKA STREAMS 被杀死时错过提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某处并使用它以避免重新处理已处理的消息。
【问题讨论】:
为了避免读取已处理但在 KAFKA STREAMS 被杀死时错过提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某处并使用它以避免重新处理已处理的消息。
【问题讨论】:
是的,这是可能的。请参阅http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information 上的常见问题解答条目。
我将复制粘贴下面的关键信息:
访问记录元数据,例如主题、分区和偏移信息?
可通过Processor API 访问记录元数据。 它也可以通过DSL 间接访问,这要归功于它 Processor API integration.
使用处理器 API,您可以通过
ProcessorContext。您可以将上下文的引用存储在Processor#init()期间处理器的实例字段,然后 查询Processor#process()中的处理器上下文,例如 (Transformer相同)。上下文会自动更新以匹配 当前正在处理的记录,这意味着方法 比如ProcessorContext#partition()总是返回当前 记录的元数据。调用处理器时有一些注意事项punctuate()中的上下文,有关详细信息,请参阅 Javadocs。如果您使用结合自定义
Transformer的DSL,例如, 您可以将输入记录的值转换为还包括分区 和偏移元数据,以及后续的 DSL 操作,例如map或 然后filter可以利用此信息。
【讨论】: