【问题标题】:Apache Flink - SQL Kafka connector Watermark on event time doesn't pull recordsApache Flink - 事件时间上的 SQL Kafka 连接器水印不提取记录
【发布时间】:2021-03-09 18:23:30
【问题描述】:

我有一个类似于Apache Flink Tumbling Window delayed result 的问题。不同之处在于,我使用使用 kafka 连接的 SQL 从主题中读取记录。我定期获取记录,但不知何故,我没有得到输出中的最后几条记录。例如,Kafka 主题中的最后一条记录的时间戳为 2020-11-26T13:11:36.605Z,聚合值的最后一个时间戳为 2020-11-26T12:59:59.999。我不明白为什么我没有得到主题中最后一条记录的聚合。请帮忙。这是我的代码。

sourceSQL = "CREATE TABLE flink_read_kafka (clientId INT, orderId INT, contactTimeStamp, WATERMARK FOR contactTimeStamp AS contactTimeStamp - INTERVAL '5' SECOND with (kafka config) ";
sinkSQL = "CREATE TABLE flink_aggr_kafka (contactTimeStamp STRING, clientId INT, orderCount BIGINT) with (kafka config) ";
aggrSQL = "insert into flink_aggr_kafka SELECT TUMBLE_ROWTIME(contactTimeStamp, INTERVAL '5' MINUTE) as contactTimeStamp, clientId, COUNT(*) orderCount from flink_read_kafka GROUP BY clientId , TUMBLE(commsTimestamp, INTERVAL '5' MINUTE)";
blinkStreamTableEnv.executeSql(sourceSQL);
blinkStreamTableEnv.executeSql(sinkSQL);
blinkStreamTableEnv.executeSql(aggrSQL);

【问题讨论】:

    标签: apache-kafka apache-flink flink-sql


    【解决方案1】:

    首先,一些背景:翻滚窗口仅在水印超过窗口的最大时间戳时才会发出结果。水印向框架表明具有较低时间戳的所有记录都已到达,因此窗口已完成并且可以发出结果。

    水印只能根据传入记录的时间戳推进,因此如果没有更多记录进入,水印将不会推进,当前打开的窗口也不会关闭。因此,当没有数据流入时,预计最后一个窗口保持打开状态。

    在您的示例中,通常会假设行时间为 2020-11-26T13:04:59.999 和 26T13:09:59.999 的窗口也会被发出,因为最新记录应该将水印推到这些时间戳之外。

    我现在可以想到两个可能不是这种情况的原因:

    • 并非所有并行源实例的时间戳都高于 26T13:05:04.999,因此输出水印实际上并未超过该值。您可以通过运行并行度为 1 的作业来测试这一点,这将缓解问题,或者通过检查 Flink Web UI 中窗口运算符的水印来验证是否是这种情况。

    • 如果您在完全一次性模式下使用 Kafka 生产者并且只使用已提交的记录,则只有在窗口触发后检查点完成后,记录才会变得可见。

    【讨论】:

    • 感谢您的输入,现在主题中的最后一条记录的时间戳为2020-11-29T13:43:21.454Z,水印没有前进。这是基于观察的数据。 Watermark mills Watremark in date 文件中的总时间 1606655913666 2020-11-29 13:18:33.666 PM 2020-11-29 13:10:00.000 1606656068881 2020-11-29 13:21:08.881 PM 2020-11-2 :15:00.000 1606656130974 2020-1010 -11-29 13:43:16.454 下午 2020-11-29 13:30:00。
    • 对不起,我不太明白你的评论。当主题中的最后一条记录具有时间戳“2020-11-29T13:43:21.454Z”时,您在 web ui 中看到的窗口运算符的水印时间是多少?是否高于尚未发出的任何窗口的结束时间?
    • 抱歉格式化混乱,这是当前状态。水印没有前进,尽管卡夫卡有记录。如果我发送另一条具有当前时间戳的记录,它只能向前移动......所以我的观察是,如果 Kafak 中有 5 分钟的记录,即从 2020-11-29T21:30:46.737Z 到 2020-,就会发生聚合11-29T21:34:46.737Z。不知何故,聚合没有考虑剩余/最新的 Kafka 记录。当前水印 - 2020-11-29 21:30:41.737 PM,Kafka 中的最后记录 - 2020-11-29T21:30:46.737Z 和文件中的最后聚合 - 2020-11-29 13:40:00.000
    • 最新聚合和最新事件/水印之间有 8 小时的延迟?你的窗口长度是多少,你每秒/分钟看到多少条记录?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-07
    • 1970-01-01
    • 1970-01-01
    • 2019-04-18
    • 2018-11-30
    • 1970-01-01
    • 2020-10-05
    相关资源
    最近更新 更多