【发布时间】:2018-03-30 17:44:10
【问题描述】:
我有一个以 byte[] 为键的主题,我想对其进行重新分区并通过消息正文字段中的另一个键来处理该主题。
我发现有KGroupedStream 和groupby 函数。但它要求聚合函数转换为 KTable/KStream。我不需要聚合。我只想重新分区并处理输出。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
我有一个以 byte[] 为键的主题,我想对其进行重新分区并通过消息正文字段中的另一个键来处理该主题。
我发现有KGroupedStream 和groupby 函数。但它要求聚合函数转换为 KTable/KStream。我不需要聚合。我只想重新分区并处理输出。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
(Kafka Streams 2.5.x 或更早版本)
不确定这是否完全符合 kosher 规定,但它可以工作,并且会自动创建重新分区主题,并使用正确数量的分区 wrtstream。
KTable emptyTable = someTable.filter((k, v) -> false);
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.leftJoin(emptyTable, (v, Null) -> v, ...);
编辑
2020 年 8 月,当 Kafka Streams 2.6.0 被引入并且 KStream.repartition() 出现时,这种方法显然变成了一个复杂的可憎之物,值得大量反对和鞭挞。
所以对于流版本 2.6.x+ 你必须使用
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.repartition();
【讨论】:
repartition()...(或在订单版本中使用through();through() 需要您手动创建使用的主题...)
repartition() 是后来才添加的,也应该使用through()。
through() 没有做 OP 需要做的事情,这就是我的回答被接受的原因。它不会自动创建具有正确分区数的主题。顺便说一句,它也没有为我做这件事,因此解决方案可能很奇怪。除非在 v2.6 之前有类似的解决方案,否则关于复杂性的整个论点似乎并不相关。不使用through()。
是的,你可以。您设置一个新键,然后通过另一个主题管道传输数据。
// repartition() will create the required topic automatically for your,
// with the same number of partitions as your input topic;
//
// it's also possible to set the number of partitions explicitly to scale in/out
// via `repartitioned(Repartitioned.numberOfPartitions(...))`
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.repartition();
// older versions:
//
// using `through()` you need to create the use topic manually,
// before you start your application
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.through("topic-name");
请注意,您需要先创建您在through() 中使用的主题,然后才能使用所需的分区数启动应用程序。
【讨论】:
.selectKey() 之后存在像 groupBy() 或 join() 这样的键相关操作时。
through() 并在之后执行 RPC。