【问题标题】:Aggregate KStream in Scala在 Scala 中聚合 KStream
【发布时间】:2018-10-15 13:17:13
【问题描述】:

如果我有以下记录

{"A",7}
{"B",10}
{"C",10}

那么聚合应该是

{"sum_ABC",27}

那么当A变为10时 即添加新消息

{"A",10}

现在应该计算为

{"sum_ABC",30}


val record: KTable[String, JsonNode] = builder.table("logs",m_consumed)
val aggVal: KTable[String, Double] = record.toStream().groupByKey()
        .reduce(new Reducer[Double]() {
        def apply(val1: Double, val2: Double): Double =
          {
            println(val1)
            val1 + val2
          }
      })

这不起作用 它继续添加值 如果重新启动它直接添加 0 + 新值。 直到现在我已经明白我需要使用聚合而不是减少。 请指导如何做。任何链接或任何教程?

【问题讨论】:

    标签: scala apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    聚合总是基于键。因此,如果您想聚合具有不同键的消息(在您的示例中为 A,B,C),您需要为所有消息设置一个新键,以将它们映射到单个键。

    此外,要获得更新语义,您需要在 KTable 上应用聚合。在您的示例中,将KTable 转换为KStream(通过toStream()),从而将语义从“更新”更改为“事实”。

    你应该使用:

    KTable record = ...
    KTable result = record.groupBy(/* map records that you want to aggregate to the same key*/)
                          .reduce(...).
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多