【问题标题】:Spring kafka - kafka streams topology set upSpring kafka - kafka 流拓扑设置
【发布时间】:2021-09-28 23:39:02
【问题描述】:

我们有一个kafka流spring boot应用程序(使用spring-kafka),这个应用程序当前从上游主题读取消息应用一些转换,并将它们写入下游主题,它不做任何聚合或连接或任何高级kafka 流功能。

目前的代码与此类似

@Bean 
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SomeObject> {
  val stream = streamsBuilder.stream<String, SomeObject>(inputTopicName)
  val branches: Array<KStream<String, SomeObject>> = stream.branch(
    { _, value -> isValidRawData(value)},
    { _, failedValue -> true}
  )
        
  branches[0].map { _, value -> transform(value) }.to(outputTopicName)
  branches[1].foreach { _, value -> s3Service.uploadEvent(value) }
}

这很好用,但是现在我们需要扩展此代码以使用来自第二个上游主题的不同模式的消息并应用稍微不同的转换,然后将它们写入相同的下游主题(具有相似的模式)如上面的拓扑。

为了实现这一点,我们有 2 个选项;

  1. 创建第二个@Bean 工厂方法,除了它的拓扑从一个单独的主题消费并应用不同的转换之外,几乎与上面的类似。

  2. 修改上面的拓扑以消费两个主题,为来自第二个主题的消息创建第三个分支,如下所示

@Bean 
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SpecificRecord> {
  val topics = listOf("topic1", "topic2")
  val stream = streamsBuilder.stream<String, SpecificRecord>(topics)
  val branches: Array<KStream<String,SpecificRecord>> = stream.branch(
    { _, value -> isRecordFromTopic1(value)},
    { _, value -> isRecordFromTopic2(value)},
    { _, failedValue -> true}
  )
        
  branches[0].map { _, value -> transformTopic1Record(value) }.to(outputTopicName)
  branches[1].map { _, value -> transformTopic2Record(value) }.to(outputTopicName)
  branches[2].foreach { _, value -> s3Service.uploadEvent(value) }
}

推荐哪种方法?从 kafka 流资源管理或性能的角度来看,我们需要考虑哪些事情?

感谢您的建议。

【问题讨论】:

    标签: kotlin apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:

    由于存在您在第二个代码中显示的主题 API 集合,我会说这两个变体都是有效且有意义的。其他一切都只是个人喜好。我会选择第一个,因为从技术上讲,最终一切都将在同一个 Streams 引擎上运行。第一种解决方案在将来引入第三种记录类型时更容易支持,依此类推。或者您可能对特定流有额外的逻辑。您可能有一个公共流来读取所有主题并通过该条件和分支分发它们。您可以通过他们自己的中间主题在他们的个人流中执行其余逻辑。但仍然:只是我的意见......

    【讨论】:

    • 感谢 Artem 我将根据第一个解决方案拆分它们,我最初的假设是第一个解决方案默认会引入第二个流线程,所以我不知道从性能角度来看这意味着什么。感谢您的回答
    • 所以出于好奇,如果我使用解决方案 1,并且我的 2 个上游主题每个都有 2 个分区,这是否意味着我将默认拥有一个 StreamThread,并且该线程将有 4 个任务哪个 2 将执行 topic1 的拓扑,而另外 2 个任务执行 topic 2 的拓扑?是这样的吗?
    • 抱歉,我暂时不知道那些 Kafka Streams 内部情况:明天再调查
    猜你喜欢
    • 1970-01-01
    • 2019-10-31
    • 1970-01-01
    • 2017-06-09
    • 1970-01-01
    • 2018-01-19
    • 2016-08-11
    • 1970-01-01
    • 2019-10-23
    相关资源
    最近更新 更多