【发布时间】:2021-09-28 23:39:02
【问题描述】:
我们有一个kafka流spring boot应用程序(使用spring-kafka),这个应用程序当前从上游主题读取消息应用一些转换,并将它们写入下游主题,它不做任何聚合或连接或任何高级kafka 流功能。
目前的代码与此类似
@Bean
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SomeObject> {
val stream = streamsBuilder.stream<String, SomeObject>(inputTopicName)
val branches: Array<KStream<String, SomeObject>> = stream.branch(
{ _, value -> isValidRawData(value)},
{ _, failedValue -> true}
)
branches[0].map { _, value -> transform(value) }.to(outputTopicName)
branches[1].foreach { _, value -> s3Service.uploadEvent(value) }
}
这很好用,但是现在我们需要扩展此代码以使用来自第二个上游主题的不同模式的消息并应用稍微不同的转换,然后将它们写入相同的下游主题(具有相似的模式)如上面的拓扑。
为了实现这一点,我们有 2 个选项;
-
创建第二个
@Bean工厂方法,除了它的拓扑从一个单独的主题消费并应用不同的转换之外,几乎与上面的类似。 -
修改上面的拓扑以消费两个主题,为来自第二个主题的消息创建第三个分支,如下所示
@Bean
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SpecificRecord> {
val topics = listOf("topic1", "topic2")
val stream = streamsBuilder.stream<String, SpecificRecord>(topics)
val branches: Array<KStream<String,SpecificRecord>> = stream.branch(
{ _, value -> isRecordFromTopic1(value)},
{ _, value -> isRecordFromTopic2(value)},
{ _, failedValue -> true}
)
branches[0].map { _, value -> transformTopic1Record(value) }.to(outputTopicName)
branches[1].map { _, value -> transformTopic2Record(value) }.to(outputTopicName)
branches[2].foreach { _, value -> s3Service.uploadEvent(value) }
}
推荐哪种方法?从 kafka 流资源管理或性能的角度来看,我们需要考虑哪些事情?
感谢您的建议。
【问题讨论】:
标签: kotlin apache-kafka apache-kafka-streams spring-kafka