【问题标题】:Kafka Streams - Processor API - Forward to different topicsKafka Streams - 处理器 API - 转发到不同的主题
【发布时间】:2018-05-15 01:01:46
【问题描述】:

我有一个处理器-API 处理器,它在内部转发到几个单独的接收器(想想一个事件分类器,尽管它在事件之间也有状态逻辑)。我正在考虑稍后在其中两个主题之间加入。加入后,我会将元素的更新(丰富)版本转发到我实际加入的主题。

如果在处理器 API 代码中转发到多个接收器(接收器 1、接收器 2),然后再发送到主题,你将如何混合 DSL?

我猜你可以创建单独的流,比如

val stream1 = builder.stream(outputTopic) 
val stream2 = builder.stream(outputTopic2)

然后从那里构建?然而,这会产生更多的子拓扑——这意味着什么?

另一种可能性是在处理器 API 中拥有自己的状态存储并在同一个处理器中管理它(我实际上正在这样做)。它增加了代码的复杂性,但它不是更有效吗?例如,您可以删除不再使用的数据(一旦进行连接,您可以将新连接的数据转发到接收器,它不再符合连接条件)。还有其他效率问题吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams kafka-streams-scala


    【解决方案1】:

    最简单的方法可能是将处理器 API 与 DSL 混合使用,以 StreamsBuilder 开头并使用 transform()

    StreamsBuilder builder = new StreamsBuilder()
    KStream[] streams = builder.stream("input-topic")
                               .transform(/* put your processor API code here */)
                               .branch(...);
    
    KStream joined = streams[0].join(streams[1], ...);
    

    也可以先将中间流写入主题并将它们读回。您获得更多子拓扑这一事实应该是无关紧要的。

    通过状态手动连接是可能的,但很难正确编码。如果可能的话,我建议使用 DSL 提供的连接运算符。

    【讨论】:

    • 对于这个,transform的输出应该是同类型的吧?现在,使用 process() 我可以创建每种类型的对象并将其发送到它们的接收器(例如,一个“pageview”事件将创建一个 Pageview 对象以及创建/更新一个 Session 对象)另外,是否有任何示例ProcesorAPI-made join?我正在使用一些可变商店。我也想,如果你可以改变你的商店,你也可以优化空间——例如。从 KV 存储中删除不再需要的数据。
    • 需要相同的类型——如果你有不同的类型,你会用 POJO 包装所有这些类型,尽管它总是只有一个成员集......我不知道连接示例。但是,您可以使用 DSL 连接作为示例(DSL 内部编译为 PAPI:github.com/apache/kafka/blob/trunk/streams/src/main/java/org/…
    猜你喜欢
    • 2018-09-30
    • 1970-01-01
    • 2018-08-03
    • 1970-01-01
    • 1970-01-01
    • 2017-10-07
    • 2019-03-15
    • 2019-04-15
    • 1970-01-01
    相关资源
    最近更新 更多