【发布时间】:2020-08-28 11:16:59
【问题描述】:
我想确认我对从一个 Kafka Stream 源读取多个处理器的效率的理解。如果我想根据谓词逻辑执行 2 个不同的过程,我相信示例 1 中的以下内容是最有效的。 Predicate 查看 Value 的内容(此处为 Notification 对象)。如果您在示例 1 中的以下每个处理器中都有一个断点,则它显示每个传入通知都会调用每个函数。而在示例 2 中,只有在满足谓词逻辑时才调用 process2 函数。
示例 1
@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>> process1() {
return input -> input
.branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1);
}
@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
return input -> input
.filter(PREDICATE_FOR_OUT_2);
.map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}
不需要以下并尝试将一个处理器的输出路由到另一个处理器? (不确定是否可能)
示例 2(概念) 我可能是这样想的,因为我来自使用纯 Kafka。这里 process1 有一个 3 路分支。其中两个分支转到各自的流,然后转到主题,但第三个需要进一步处理才能路由到主题。
@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>[]> process1() {
return input -> input
.branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1, PREDICATE_FOR_OUT_2);
}
我们能否将 PREDICATE_FOR_OUT_2 的分支路由到 process2。这意味着只有满足 PREDICATE_FOR_OUT_2 时才会调用 process2
@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
return input -> input
.map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}
由于 Kafka Streams 提供的抽象和功能,我的想法是示例 2 是多余的(实际上也不可能)
【问题讨论】:
标签: apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream