【发布时间】:2021-03-09 18:23:30
【问题描述】:
我有一个类似于Apache Flink Tumbling Window delayed result 的问题。不同之处在于,我使用使用 kafka 连接的 SQL 从主题中读取记录。我定期获取记录,但不知何故,我没有得到输出中的最后几条记录。例如,Kafka 主题中的最后一条记录的时间戳为 2020-11-26T13:11:36.605Z,聚合值的最后一个时间戳为 2020-11-26T12:59:59.999。我不明白为什么我没有得到主题中最后一条记录的聚合。请帮忙。这是我的代码。
sourceSQL = "CREATE TABLE flink_read_kafka (clientId INT, orderId INT, contactTimeStamp, WATERMARK FOR contactTimeStamp AS contactTimeStamp - INTERVAL '5' SECOND with (kafka config) ";
sinkSQL = "CREATE TABLE flink_aggr_kafka (contactTimeStamp STRING, clientId INT, orderCount BIGINT) with (kafka config) ";
aggrSQL = "insert into flink_aggr_kafka SELECT TUMBLE_ROWTIME(contactTimeStamp, INTERVAL '5' MINUTE) as contactTimeStamp, clientId, COUNT(*) orderCount from flink_read_kafka GROUP BY clientId , TUMBLE(commsTimestamp, INTERVAL '5' MINUTE)";
blinkStreamTableEnv.executeSql(sourceSQL);
blinkStreamTableEnv.executeSql(sinkSQL);
blinkStreamTableEnv.executeSql(aggrSQL);
【问题讨论】:
标签: apache-kafka apache-flink flink-sql