【问题标题】:Using Flink to get Counts Within a Keyed Window使用 Flink 获取键控窗口内的计数
【发布时间】:2019-03-09 06:24:45
【问题描述】:

我通过 Scala 接口使用 Flink 进行一些数据处理。我有一些来自元组的用户数据:

(user1, "titanic")
(user1, "titanic")
(user1, "batman")
(user2, "star wars")
(user2, "star wars")
(user2, "batman")

我想由用户键入,创建一个窗口,然后计算用户在该窗口中观看特定电影的次数,这样我最终得到一个从每部电影到观看次数的 Map对于每个用户。例如,对于user1,正确的输出是Map("titanic" -> 2, "batman" -> 1)。 我知道我的代码的第一部分应该是这样的:

keyedStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10)))

但我不知道如何在窗口内进行进一步聚合,以便最终得到每个用户/窗口的视图计数图。我尝试编写自己的 AggregateFunction 将这些计数收集到可变 Map 中,但不幸的是可变 Map 不可序列化,因此它失败了。

我该怎么做?

【问题讨论】:

  • 我对你的终极问题有点困惑。
  • 我想知道如何聚合窗口中的元组,以便输出是从电影到用户观看该电影的次数的 Map。我无法用我自己的 AggregateFunction 做到这一点,因为 scala Map 不可序列化。有办法解决吗?还是有其他方法可以进行聚合?
  • 我想你想做一个keyby,然后是一个扩展RichMapFunctionmap。它将允许您保存MapState,您可以在其中跟踪您的计数。如果不想使用MapState,可以参考wordcount examplegroupby用户和电影,以及sum(1)。
  • 我会试一试并报告。感谢您的提示!

标签: scala apache-flink flink-streaming


【解决方案1】:

您应该可以使用AggregateFunction 解决问题:

source
  .keyBy(0)
  .timeWindow(Time.seconds(10L))
  .aggregate(new AggregateFunction[(String, String), (String, Map[String, Int]), (String, Map[String, Int])] {
    override def createAccumulator(): (String, Map[String, Int]) = ("", Map())

    override def add(value: (String, String), accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = {
      val counter = accumulator._2.getOrElse(value._2, 0)
      (value._1, accumulator._2 + (value._2 -> (counter + 1)))
    }

    override def getResult(accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = accumulator

    override def merge(a: (String, Map[String, Int]), b: (String, Map[String, Int])): (String, Map[String, Int]) = {
      (a._1, (a._2.keySet ++ b._2.keySet) map (k => k -> (a._2.getOrElse(k, 0) + b._2.getOrElse(k, 0))) toMap)
    }
  })

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-05-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多