【问题标题】:Kafka Streams and Spring Cloud Stream - Processor EfficiencyKafka Streams 和 Spring Cloud Stream - 处理器效率
【发布时间】:2020-08-28 11:16:59
【问题描述】:

我想确认我对从一个 Kafka Stream 源读取多个处理器的效率的理解。如果我想根据谓词逻辑执行 2 个不同的过程,我相信示例 1 中的以下内容是最有效的。 Predicate 查看 Value 的内容(此处为 Notification 对象)。如果您在示例 1 中的以下每个处理器中都有一个断点,则它显示每个传入通知都会调用每个函数。而在示例 2 中,只有在满足谓词逻辑时才调用 process2 函数。

示例 1

@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>> process1() {

    return input -> input
            .branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1);
}

@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
    return input -> input
            .filter(PREDICATE_FOR_OUT_2);
            .map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}

不需要以下并尝试将一个处理器的输出路由到另一个处理器? (不确定是否可能)

示例 2(概念) 我可能是这样想的,因为我来自使用纯 Kafka。这里 process1 有一个 3 路分支。其中两个分支转到各自的流,然后转到主题,但第三个需要进一步处理才能路由到主题。

@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>[]> process1() {

    return input -> input
            .branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1, PREDICATE_FOR_OUT_2);
}

我们能否将 PREDICATE_FOR_OUT_2 的分支路由到 process2。这意味着只有满足 PREDICATE_FOR_OUT_2 时才会调用 process2

@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
    return input -> input
            .map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}

由于 Kafka Streams 提供的抽象和功能,我的想法是示例 2 是多余的(实际上也不可能)

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream


    【解决方案1】:

    我认为您的示例的两种情况都可以完成工作,但存在一些差异。在第一个示例中,您有两个函数,都从同一个 Kafka 主题接收数据,第二个函数在路由到输出主题之前执行一些额外的逻辑。在第二个示例中,您再次拥有两个函数。在第一个函数中,您有 3 个分支,每个分支都将数据发送到 Kafka 主题(我假设它们是 3 个不同的主题)。然后在第二个函数中,您从第一个函数的第三个输出主题接收数据。在执行示例 2 的第二个函数中的逻辑之后,您将其发送到此分支的最终目的地。您正在为第二个示例引入一个额外的主题。我认为您的第一个示例更具可读性和简洁性。

    【讨论】:

    • 感谢您的回复。我想知道是否可以在没有第三个分支的中间主题的情况下做第二个示例?这会将处理器链接在一起。我不确定这是否可能。即使是,也可能不可取
    • 我不确定“Spring”,但在原生 Streams API 中,当然可以将第二个 Processor 直接链接到分支的输出。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-26
    • 2021-09-03
    • 1970-01-01
    • 2021-06-07
    • 1970-01-01
    • 2023-04-05
    相关资源
    最近更新 更多