【问题标题】:Apache Flink: Windowed ReduceFunction is never executedApache Flink:Windowed ReduceFunction 永远不会执行
【发布时间】:2017-06-29 16:04:00
【问题描述】:

下面是代码 sn-p,我使用的是基于 Tumbling EventTime 的窗口

DataStream<OHLC> ohlcStream = stockStream.assignTimestampsAndWatermarks(new TimestampExtractor()).map(new mapStockToOhlc()).keyBy((KeySelector<OHLC, Long>) o -> o.getMinuteKey())
        .timeWindow(Time.seconds(60))
        .reduce(new myAggFunction());

不幸的是,它看起来从不执行 reduce 函数。如果使用上面没有窗口的代码,reduce 函数可以正常工作。下面是 TimestampExtractor 的代码。 30秒的水印延迟只是一个测试值,但一分钟的翻滚窗口是m

    public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<StockTrade> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - 30000);
    }

    @Override
    public long extractTimestamp(StockTrade stockTrade, long l) {
        BigDecimal bd = new BigDecimal(stockTrade.getTime());
        // bd contains miliseconds timestamp 1498658629.036
        return bd.longValue();
    }
}

bd.longValue() 返回秒时间戳 1498658629,因为我的窗口也是以秒为单位定义的。
当我使用返回分钟时间戳的 bd.longValue()/60 时,会调用 reduce 函数。我的输出文件包含每个reduce操作的所有记录

{time=1498717692.000, minuteTime=24978628, n=1, open=2248.0}
{time=1498717692.000, minuteTime=24978628, n=2, open=2248.0}
...
{time=1498717692.000, minuteTime=24978628, n=8, open=2248.0}

那么,谁能给我解释一下,发生了什么?非常感谢。

【问题讨论】:

    标签: java apache-flink flink-streaming


    【解决方案1】:

    通常,水印应与数据中的时间戳相关,而不应基于系统时钟。使用事件时间的一大好处是,同一个应用程序可用于重新处理历史数据或处理当前数据,但如果您将时间戳与系统时钟进行比较,就像您在此处所做的那样,这是不可能的。

    水印可以被认为是所有时间戳小于水印的数据已经到达的声明。或者换句话说,任何时间戳小于当前水印的数据都将被认为是迟到的。我的猜测是您没有看到任何结果,因为您的水印导致您的所有数据都被认为是迟到的,并且窗口运算符正在删除所有这些迟到的数据。

    我建议您改用BoundedOutOfOrdernessTimestampExtractor。它的工作原理是跟踪迄今为止在数据流中看到的最大时间戳,并从该最大时间戳中减去延迟,而不是系统时钟。 source code,如果你好奇的话。

    【讨论】:

    • 谢谢,我会调查的。实际上,我在整个互联网上都使用了示例,例如vishnuviswanath.com/flink_eventtime.html ...我的数据还包含到达集群的时间(通常比 EventTime 晚几毫秒),而且 Flink 进程似乎处理得很快。所以 90 秒应该足够了……
    • 我看了一下文档和源代码。我自己也用过类似的东西。即使时间可能足够,你是对的,这可以处理历史数据。实际上,我们不得不重启集群,所以 Kafka 主题仍然处于空闲状态,所以我验证了 Kafka 主题中保存的处理历史数据。 Reduce 函数被调用,它只保存最后一个(可能?-这可能吗?似乎合乎逻辑)导致定义的窗口 + 水印延迟。我必须进行更多调查并比较结果。一旦我验证结果的正确性,我会将主题标记为已解决。
    • 相对于系统时钟的水印看起来可以工作,但即使您只关心处理实时数据也是有问题的。这是因为如果作业失败,然后回滚到检查点并重新启动,则正在重放的数据可能会被视为延迟。
    • 是的,一个窗口化的reduce函数增量聚合窗口的内容,当窗口被触发时产生一个值。
    猜你喜欢
    • 1970-01-01
    • 2019-12-07
    • 1970-01-01
    • 2020-01-23
    • 2021-08-19
    • 2012-05-30
    • 2016-06-30
    • 2019-12-15
    相关资源
    最近更新 更多