【发布时间】:2020-03-06 14:12:01
【问题描述】:
我有一个KStream<String, X>,我基本上想将其转换为KTable<String, Y>
我能找到使用 DSL 实现此目的的唯一方法是使用映射、分组然后归约。
val stream: KStream<String, X> = ...
val table: KTable<String, Y> = stream
.mapValues({ value -> toYOrNull(value)})
.groupByKey(Grouped.with(Serdes.String(), ySerde))
.reduce(
{old: Y?, updated: Y? -> updated},
Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-store")
.withKeySerde(Serdes.String()
.withValueSerde(ySerde)
)
我希望这可以处理reduce 中updated 的值为null 的情况,但是当我使用TopologyTestDriver 检查商店时,它似乎仍然具有旧版本。我做错了什么?
这是我的测试:
@Test
fun shouldDeleteFromTableWhenNull() {
val store = testDriver.getKeyValueStore<String, Y?>("y-store")
store.put("key", Y())
inputTopic.pipeInput("key", anXThatMapsToANullY)
assertThat(store.get("key")).isNull() // Fails as the old entry is still there
}
【问题讨论】:
标签: java kotlin apache-kafka-streams