【问题标题】:Kafka - How to use filter and filternot at the same time?Kafka - 如何同时使用过滤器和过滤器?
【发布时间】:2016-12-01 18:44:15
【问题描述】:

我有一个 Kafka 流,它从一个主题获取数据,并且需要将该信息过滤到两个不同的主题。

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

但是,当我这样做时,它会从主题中读取数据两次——不确定随着数据变大是否会对性能产生任何影响。有没有办法只过滤一次并将其推送到两个主题?

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    您的方法是正确的,并且数据没有从主题中读取两次,并且也没有进行内部数据复制。您的方法的唯一缺点是,对于每条记录都会评估两个过滤谓词 - 但是,这很便宜,不应该是性能问题。

    但是,您仍然可以通过使用KStream#branch() 来提高性能,该KStream#branch() 确实采用多个谓词并逐个评估所有谓词并为每个谓词返回一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不会为该单个记录评估进一步的谓词 - 这确保每条记录都被添加到最大一个输出流中;或者在以下情况下被删除没有谓词匹配)。

    因此,您可以只为branch() 提供两个谓词:第一个谓词与原始filter() 谓词相同,第二个谓词始终返回true

    KStream<String, Model> stream = builder.stream(
        Serdes.String(),
        specificAvroSerde,
        "not-filtered-topic"
    );
    KStream[] splitStreams = stream.branch(
        (key, value) -> new Processor().test(key,value),
        (key, value) -> true
    );
    splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
    splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");
    

    不确定此代码是否比您的原始版本更具可读性。我想这是一个品味问题,我个人更喜欢你的原始代码,因为它确实更好地表达了语义。

    我添加的版本应该稍微提高 CPU 效率,因为对于所有满足谓词的记录,它只评估一次。对于所有不满足结果的记录,将返回一个简单的true(即没有第二个谓词评估)。

    如果您知道大多数记录将以splitStream[1] 结尾,您还可以反转谓词(并将splitStream[0] 用作“坏流”)以减少对第二个true-returning 的调用次数谓词。但这些只是微优化,应该无关紧要。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-06-19
      • 2020-09-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多