【问题标题】:Stream of updates to the largest time window in FlinkFlink 中最大时间窗口的更新流
【发布时间】:2016-10-22 14:13:44
【问题描述】:

我想从一个时间窗口的键控流中获得迄今为止看到的最大窗口的流(就元素数量而言最大)。

目前我有以下代码:

source
  .keyBy(...)
  .timeWindow(...)
  .fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
  .keyBy(_ => ())
  .maxBy(1)

fold 的结果是 (key, count) 元素的流 - 因此,我想从这个流中获取“具有最高计数的键”的更新流。

然后我键入一个常量(keyBy(_ => ()) - 因为这是一个全局操作),并使用 maxBy - 这几乎 有效:我得到了最高计数的流,但是每个元素都会发出当前的最高计数。

我认为我正在寻找的是某种带有先前值的过滤器,它只会在新值与先前值不同时发出元素。

目前在 Flink 中可以吗?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    Flink 默认没有这样的过滤器,但你自己实现一个应该很容易。

    您可以使用与此类似的有状态 FlatMap 来执行此操作:

    val source: DataStream[Int] = ???
    
    source
      .keyBy(_: Int => _)
      .timeWindow(Time.minutes(10))
      .fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
      // move everything to the same key
      .keyBy(_ => 0) 
      // use stateful flatmap to remember highest count and filter by that
      .flatMapWithState( (in, state: Option[Int]) => 
        // filter condition
        if (in._2 > state.getOrElse(-1)) 
          // emit new value and update max count
          (Seq(in), Some(in._2)) 
        else 
          // emit nothing (empty Seq()) and keep count
          (Seq(), state)
      ).setParallelism(1)
    

    如果非并行(单线程)过滤器运算符成为瓶颈,您可以通过添加带有随机键的keyBy 和具有更高并行度的有状态过滤器FlatMap 来添加并行预过滤器。

    【讨论】:

    • 啊!我在类路径中有 flink 1.0,它不包括 flatMapWithState 操作:) 谢谢!
    • 需要setParallelism(1) 吗?由于按相同键分组,并行度不会为 1 吗?
    • 分组是通过键选择器功能完成的。对于 Flink,这个函数是一个黑盒子,即不知道函数返回一个常量值。 Flink 会启动多个并行实例,但所有数据都只会发送到一个。原则上,可以分析函数以推断其行为,但这还没有完成。
    • 啊,所以这只是帮助 flink - 但不是获得正确答案的必要条件。再次感谢:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-18
    • 2018-06-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多