【发布时间】:2020-04-20 05:04:47
【问题描述】:
我想使用 Spring Kafka 支持的 Kafka Streams 从两个 Kafka 主题中消费。
主题具有不同的键和值。我想通过.merge(KStream<X,Y> otherStream)方法将第二个主题和merge与第一个主题的键和值映射起来。
这是一个例子:
// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
"second-topic",
consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
(key, value) -> {
List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
// Do stuff an fill out the list
return list;
});
// Block 2
KStream<MyKey, MyValue>[] branches = stream
.merge(stream2)
... business stuff
通过这个解决方案,我得到一个ClassCastException,原因是MyKey 不能转换为MyKey。原因是,它们是由不同的模块和类加载器提供的。错误发生在序列化中,在合并块中。使用transform(..) 我得到了同样的行为。
如果我附加命令.through("tmp-topic") 一切正常。它表明主题的具体化返回一个有效的可序列化对象而不是flatMap(...)。
我在groupByKey 中找到了以下 API 文档:
... 如果在此操作之前使用了键更改运算符(例如,selectKey(KeyValueMapper)、map(KeyValueMapper)、flatMap(KeyValueMapper) 或 transform(TransformerSupplier, String...)),并且之后没有发生数据重新分配(例如,通过through(String)) 将在 Kafka 中创建一个内部重新分区主题。该主题将命名为“${applicationId}-XXX-repartition”,其中“applicationId”是用户在 StreamsConfig 中通过参数 APPLICATION_ID_CONFIG 指定的,“XXX”是内部生成的名称,“-repartition”是固定后缀。您可以通过 Topology.describe() 检索所有生成的内部主题名称。 对于这种情况,该流的所有数据将通过重新分区主题重新分配,方法是向其中写入所有记录,并从其中重新读取所有记录,以便生成的 KGroupedStream 在其键上正确分区...
如您所见,更改操作(如flatMap(...))后的最佳实践似乎是将其写入主题,因为序列化和重新分区。
您如何看待使用through("topic") 来实现它?
有谁知道,是否有可能在 flatMap(...) 之后实现而不写主题?
版本
Spring Kafka 版本:2.2.5.RELEASE
Apache Kafka 客户端:2.0.1
Apache Kafka 流:2.0.1
【问题讨论】:
标签: java apache-kafka apache-kafka-streams spring-kafka