【问题标题】:kafka flink timestamp Event time and watermarkkafka flink timestamp 事件时间和水印
【发布时间】:2020-08-30 14:16:22
【问题描述】:

我正在阅读《使用 Apache Flink 进行流处理》一书,其中指出“从 0.10.0 版开始,Kafka 支持消息时间戳。从 Kafka 0.10 或更高版本读取时,如果应用程序在事件时间模式下运行,消费者将自动提取消息时间戳作为事件时间时间戳*” 那么在processElement 函数中,调用context.timestamp() 默认会返回kafka 消息时间戳? 您能否提供一个简单的示例,说明如何实现基于消费的 kafka 消息时间戳提取(并构建水印)的 AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks。

如果我使用TimeCharacteristic.ProcessingTime,ctx.timestamp() 会返回处理时间吗?在这种情况下,它会类似于context.timerService().currentProcessingTime()

谢谢。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    Flink Kafka 消费者会为您处理这个问题,并将时间戳放在需要的位置。在 Flink 1.11 中你可以简单地依赖它,尽管你仍然需要注意提供一个 WatermarkStrategy 来指定无序(或断言时间戳是有序的):

    FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
    myConsumer.assignTimestampsAndWatermarks(
        WatermarkStrategy.
            .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
    

    在早期版本的 Flink 中,您必须提供时间戳分配器的实现,如下所示:

    public long extractTimestamp(Long element, long previousElementTimestamp) {
        return previousElementTimestamp;
    }
    

    extractTimestamp 方法的这个版本将 StreamRecord 中存在的时间戳的当前值作为previousElementTimestamp 传递,在这种情况下,它将是 Flink Kafka 使用者放置在那里的时间戳。

    Flink 1.11 docs
    Flink 1.10 docs

    至于ctx.timestamp()在使用TimeCharacteristic.ProcessingTime时返回什么,这种情况下这个方法返回NULL。 (从语义上讲,是的,就好像时间戳是当前处理时间,但这不是它的实现方式。)

    【讨论】:

    • 有没有办法可以指定记录的时间戳而不是 Kakfa 记录的时间戳?例如,我希望 Kafka 消费者从客户端查看我的购物网站订单时间戳以推进水印。以下是我尝试过的代码
    • WatermarkStrategy.forBoundedOutOfOrderness[Order](Duration.ofMillis(100)).withTimestampAssigner(new SerializableTimestampAssigner[Order] { override def extractTimestamp(order: Order, recordTimestamp: Long): Long = { order.getTimestamp }})}
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-03-09
    • 2018-02-09
    • 1970-01-01
    • 2018-11-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多