【问题标题】:Use the same topic as a source more than once with Kafka Streams DSL使用 Kafka Streams DSL 多次将同一主题用作源
【发布时间】:2018-09-20 13:50:56
【问题描述】:

在使用 Kafka Streams DSL 时,有没有办法将同一主题用作两个不同处理例程的源?

StreamsBuilder streamsBuilder = new StreamsBuilder();

// use the topic as a stream
streamsBuilder.stream("topic")...

// use the same topic as a source for KTable
streamsBuilder.table("topic")...

return streamsBuilder.build();

上面的简单实现在运行时抛出TopologyException无效拓扑:主题主题已被另一个来源注册。如果我们深入了解底层处理器 API,这是完全有效的。使用它是唯一的出路吗?

更新: 到目前为止我发现的最接近的替代方案:

StreamsBuilder streamsBuilder = new StreamsBuilder();

final KStream<Object, Object> stream = streamsBuilder.stream("topic");

// use the topic as a stream
stream...

// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...

return streamsBuilder.build();

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    阅读与流和表格相同的主题在语义上是有问题的恕我直言。 Streams 对不可变的事实进行建模,而用于读取 KTable 模型的更改日志主题会更新。

    如果您想在多个流中使用单个主题,您可以多次重复使用相同的KStream 对象(在语义上类似于广播):

    KStream stream = ...
    stream.filter();
    stream.map();
    

    还比较:https://issues.apache.org/jira/browse/KAFKA-6687(有计划取消此限制。我怀疑,我们将允许同时使用一个主题作为KStreamKTable - 比较我上面的评论)。

    【讨论】:

    • 谢谢,马蒂亚斯!我了解两者背后的语义差异,我只是希望使用一些稍微肮脏的技巧来使其按照我描述的方式工作。您提到的 JIRA 问题实际上是对我一直在寻找的答案的补充:这种技巧不受欢迎,但它们的可行性可能会在以后介绍。我会接受这个答案,谢谢!
    【解决方案2】:

    是的,你可以,但为此你需要有多个StreamsBuilder

    StreamsBuilder streamsBuilder1 = new StreamsBuilder();
    streamsBuilder1.stream("topic");
    
    StreamsBuilder streamsBuilder2 = new StreamsBuilder();
    streamsBuilder2.table("topic");
    
    Topology topology1 = streamsBuilder1.build();
    Topology topology2 = streamsBuilder2.build();
    
    KafkaStreams kafkaStreams1 = new KafkaStreams(topology1, streamsConfig1);
    KafkaStreams kafkaStreams2 = new KafkaStreams(topology2, streamsConfig2);
    

    还要确保每个StreamsConfig 具有不同的application.id

    【讨论】:

    • 当然,这是一种解决方法,但我想知道在同一个 Topology 内是否有可能(出于我自己的原因:在我们的案例中,在应用程序中支持一个 KafkaStreams 对象要容易得多)。我的意思是,可以通过处理器 API 来实现,对吧。
    • 好的,如果您可以从KStream 创建KTable,那么您也可以尝试以下操作:KStream&lt;Object, Object&gt; kStream = streamsBuilder.stream("topic"); kStream.foreach(...); // process KStream; KTable kTable = kStream.groupByKey().reduce(...);
    • 谢谢,Vasiliy,以前一定错过了您的评论,抱歉!如您所见,我有相同的解决方案(请参阅更新部分),所以我相信这是目前唯一的方法,不幸的是不是我一直在寻找的方法。没关系,我们可以忍受,对吧?
    • 对,可以使用你在更新中发布的流
    • @VasiliySarzhynskyi 如果不是因为您丢失了跨拓扑的数据局部性,您的解决方案将会很有吸引力。即,如果有多个消费者实例,则分区最终位于不同的 JVM 中。想法?
    猜你喜欢
    • 2020-01-07
    • 2020-01-08
    • 1970-01-01
    • 2018-06-08
    • 1970-01-01
    • 2017-04-09
    • 2019-07-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多