【发布时间】:2017-06-29 16:04:00
【问题描述】:
下面是代码 sn-p,我使用的是基于 Tumbling EventTime 的窗口
DataStream<OHLC> ohlcStream = stockStream.assignTimestampsAndWatermarks(new TimestampExtractor()).map(new mapStockToOhlc()).keyBy((KeySelector<OHLC, Long>) o -> o.getMinuteKey())
.timeWindow(Time.seconds(60))
.reduce(new myAggFunction());
不幸的是,它看起来从不执行 reduce 函数。如果使用上面没有窗口的代码,reduce 函数可以正常工作。下面是 TimestampExtractor 的代码。 30秒的水印延迟只是一个测试值,但一分钟的翻滚窗口是m
public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<StockTrade> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 30000);
}
@Override
public long extractTimestamp(StockTrade stockTrade, long l) {
BigDecimal bd = new BigDecimal(stockTrade.getTime());
// bd contains miliseconds timestamp 1498658629.036
return bd.longValue();
}
}
bd.longValue() 返回秒时间戳 1498658629,因为我的窗口也是以秒为单位定义的。
当我使用返回分钟时间戳的 bd.longValue()/60 时,会调用 reduce 函数。我的输出文件包含每个reduce操作的所有记录
{time=1498717692.000, minuteTime=24978628, n=1, open=2248.0}
{time=1498717692.000, minuteTime=24978628, n=2, open=2248.0}
...
{time=1498717692.000, minuteTime=24978628, n=8, open=2248.0}
那么,谁能给我解释一下,发生了什么?非常感谢。
【问题讨论】:
标签: java apache-flink flink-streaming