【问题标题】:How to sessionize stream with Apache Flink?如何使用 Apache Flink 会话化流?
【发布时间】:2017-06-18 12:53:25
【问题描述】:

我想对这个流进行会话:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5, ...到这些会议:

1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5

我编写了 CustomTrigger 来检测流元素何时从 1 变为 2(2 变为 3、3 变为 0 等等),然后触发触发器。但这不是解决方案,因为当我处理 2 的第一个元素并触发触发器时,窗口将为 [1,1,1,2] 但我需要在 1 的最后一个元素上触发触发器。

这是我的自定义触发器类中的 onElement 函数的伪代码:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if (prevState == element.value) {
      prevState = element.value
      TriggerResult.CONTINUE
    } else {
      prevState = element.value
      TriggerResult.FIRE
    }
}

我该如何解决这个问题?

【问题讨论】:

    标签: scala apache-flink stream-processing


    【解决方案1】:

    我认为FlatMapFunctionListState 是实现此用例的最简单方法。

    当一个新元素到达时(即,flatMap() 方法被调用),你检查值是否改变。如果值未更改,则将元素附加到状态。如果值更改,则将当前列表状态作为会话发出,清除列表并将新元素作为第一个插入到列表状态中。

    但是,您应该记住,这是假设元素的顺序保持不变。 Flink 确保在一个分区内,即只要元素没有被打乱并且所有操作符都以相同的并行度运行。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-19
      • 1970-01-01
      • 2020-10-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多