【发布时间】:2020-04-05 19:08:35
【问题描述】:
我有一个类似于以下模式的 kafka 消息:
{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}
使用 Flink 流计算对这些项目执行一些计数操作。
现在我想声明一个会话,在X秒范围内收集相同的用户+值作为一个单一的,具有最新的时间戳,然后它将被转发到下一个流一次
所以我写了这样的东西:
data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
.....
})
.keyBy(new KeySelector<Data, String>(){
.......
})
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.aggregate(new AggregateFunction<Data, Data, Data>() {
@Override
public Data createAccumulator() {
return null;
}
@Override
public Data add(Data value, Data accumulator) {
if(accumulator == null) {
accumulator = value;
}
return accumulator;
}
@Override
public Data getResult(Data accumulator) {
return accumulator;
}
@Override
public Data merge(Data a, Data b) {
return a;
}
});
但问题是 getResult 函数是在每个元素上调用的,而不仅仅是在窗口的末尾。
我的问题是如何在窗口结束之前不将聚合结果转发到下一个流。据我所知,当没有更多元素时,即使窗口没有结束,流程流结果也会向前移动
有什么建议吗?
谢谢
【问题讨论】:
标签: apache-flink flink-streaming