【发布时间】:2018-10-15 13:17:13
【问题描述】:
如果我有以下记录
{"A",7}
{"B",10}
{"C",10}
那么聚合应该是
{"sum_ABC",27}
那么当A变为10时 即添加新消息
{"A",10}
现在应该计算为
{"sum_ABC",30}
val record: KTable[String, JsonNode] = builder.table("logs",m_consumed)
val aggVal: KTable[String, Double] = record.toStream().groupByKey()
.reduce(new Reducer[Double]() {
def apply(val1: Double, val2: Double): Double =
{
println(val1)
val1 + val2
}
})
这不起作用 它继续添加值 如果重新启动它直接添加 0 + 新值。 直到现在我已经明白我需要使用聚合而不是减少。 请指导如何做。任何链接或任何教程?
【问题讨论】:
标签: scala apache-kafka kafka-consumer-api apache-kafka-streams