【发布时间】:2018-03-24 07:31:27
【问题描述】:
我正在使用 FluentD(v.12 最新稳定版本)向 Kafka 发送消息。但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1。 因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 时的时间点。
是否有特定于 Kafka Streams 的解决方案?
我真正感兴趣的时间戳是 fluentd 在消息中发送的:
"时间戳":"1507885936","主机":"V.X.Y.Z."
在 kafka 中记录表示:
offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
我想在kafka里有这样的记录:
offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
我的解决方法如下:
-
编写消费者提取时间戳 (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
-
写一个生产者来生产一个新的记录,时间戳集(ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
如果有的话,我更喜欢 KafkaStreams 解决方案。
【问题讨论】: