【发布时间】:2017-06-18 12:53:25
【问题描述】:
我想对这个流进行会话:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5, ...到这些会议:
1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5
我编写了 CustomTrigger 来检测流元素何时从 1 变为 2(2 变为 3、3 变为 0 等等),然后触发触发器。但这不是解决方案,因为当我处理 2 的第一个元素并触发触发器时,窗口将为 [1,1,1,2] 但我需要在 1 的最后一个元素上触发触发器。
这是我的自定义触发器类中的 onElement 函数的伪代码:
override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (prevState == element.value) {
prevState = element.value
TriggerResult.CONTINUE
} else {
prevState = element.value
TriggerResult.FIRE
}
}
我该如何解决这个问题?
【问题讨论】:
标签: scala apache-flink stream-processing