【问题标题】:How to change timestamp of records?如何更改记录的时间戳?
【发布时间】:2018-03-24 07:31:27
【问题描述】:

我正在使用 FluentD(v.12 最新稳定版本)向 Kafka 发送消息。但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1。 因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 时的时间点。

是否有特定于 Kafka Streams 的解决方案?


我真正感兴趣的时间戳是 fluentd 在消息中发送的:

"时间戳":"1507885936","主机":"V.X.Y.Z."

在 kafka 中记录表示:

offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

我想在kafka里有这样的记录:

offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

我的解决方法如下:

如果有的话,我更喜欢 KafkaStreams 解决方案。

【问题讨论】:

    标签: java apache-kafka-streams


    【解决方案1】:

    您可以编写一个非常简单的 Kafka Streams 应用程序,例如:

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("input-topic").to("output-topic");
    

    并使用自定义TimestampExtractor 配置应用程序,从记录中提取时间戳并返回它。

    Kafka Streams 将在将记录写回 Kafka 时使用返回的时间戳。

    注意:如果您有乱序数据——即时间戳没有严格排序——结果也将包含乱序时间戳。 Kafka Streams 使用返回的时间戳写回 Kafka(即,无论提取器返回什么,都用作记录元数据时间戳)。请注意,在写入时,当前处理的输入记录的时间戳将用于所有生成的输出记录——这适用于 1.0 版,但在未来的版本中可能会发生变化。)。

    更新:

    通常,您可以通过处理器 API 修改时间戳。调用context.forward()可以通过To.all().withTimestamp(...)设置输出记录时间戳作为forward()的参数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-01-22
      • 1970-01-01
      • 1970-01-01
      • 2021-06-28
      • 2011-03-27
      • 2020-07-30
      • 2018-03-22
      相关资源
      最近更新 更多