【发布时间】:2018-10-05 21:30:12
【问题描述】:
我有一个 KStream,其中包含来自主题 to1 的数据,如下所示:
T1-KEY -> {T1}
T2-KEY -> {T2}
还有一个KTable,构造如下:
我正在使用 org.apache.kafka.streams.StreamsBuilder 从某个主题 to2 创建 KTable,如下所示:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
该流随后被平面映射并按 Key s.t. 分组。生成的 KTable 如下所示:
T1 -> { ["B1", "B2"] }
稍后,现在主题 to2 中出现以下消息:
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
现在我希望我的 KTable 能够反映这些变化并如下所示:
T1 -> { ["B2"] }
但它看起来像这样:
T1 -> { ["B1", "B2"] }
我注意到,在我的Aggregator<Tx-KEY, Bx, Set<Bx>> 中给出的最后一个参数是集合["B1", "B2"],即使当我在聚合之前偷看时我只得到一个匹配"B2"。
我是否理解聚合错误或这里发生了什么?
编辑
我想我缩小了范围:显然聚合的 Initializer 只在 very 第一次调用 - 之后聚合总是接收 last aggregate 作为最后一个参数,例如
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
Set<Bx> aggregate 在第一次调用时是 [](通过初始化程序创建),而在第二次调用时是 ["B1", "B2"]。
有什么想法吗?
编辑 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
编辑 3
我不能只使用平面地图,因为我必须组合多个 Ax 元素,例如
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
然后我期待一些类似的分组
T1 -> { ["B1", "B2"] }
并且在下一次迭代中,当消息
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
如期而至
T1 -> { ["B1"] }
..
【问题讨论】:
-
请提供您的代码,尤其是您的聚合器的 apply 方法。您在编辑中写的内容是正确的。 Initializer,顾名思义,仅用于初始化,稍后将结果传递给第二个参数。
-
感谢您的回复,我在上面添加了申请功能。不过,我确实想在那里有一套新的。
标签: apache-kafka apache-kafka-streams