【发布时间】:2021-05-18 08:25:02
【问题描述】:
我有一个主题,它接收带有可能 部分 数据的 JSON 记录。我想合并这些数据,所以我尝试在最终数据记录中收集尽可能多的信息。
t1: { id: '1234', attribute1: 'foo' }
t2: { id: '1234', attribute2: 'bar' }
合并记录值后的期望流:
t1: { id: '1234', attribute1: 'foo' }
t2: { id: '1234', attribute1: 'bar', attribute2: 'bar' }
为此我尝试了:
//key of the topic is id
KStream<String, MyObject> input = ...
return input.groupByKey().reduce((current, newEvent) -> return newEvent.merge(current)).toStream();
但这只会产生一个条目,因为 groupy/reduce 会产生一个 KTable。有没有可能做到这一点?
编辑:流定义是正确的,reduce 似乎默认情况下不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,请使用配置属性:
cache.max.bytes.buffering: 0
必须设置。
【问题讨论】:
标签: java apache-kafka-streams spring-kafka