【问题标题】:How to supress windowed aggregation result in Spring Cloud Kafka Streams?如何抑制 Spring Cloud Kafka Streams 中的窗口聚合结果?
【发布时间】:2020-03-24 21:04:49
【问题描述】:

我在 Spring Cloud 项目中使用 Kafka-streams-binder。 Kafka 流应用程序使用 6 分钟的滑动窗口来聚合结果并分析模式。但问题是聚合操作会产生重复的结果。

我想抑制中间结果并仅在应用程序中的窗口结束后发布。 这可以通过 Kafka 2.1.1 中的 Kafka .supress() 操作来实现。但是 Spring Cloud 版本没有最新的 kafka 来使用该能力。

项目使用的依赖项

<spring-boot.version>2.1.9.RELEASE</spring-boot.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>

抑制中间结果的任何替代方法都会有所帮助。

【问题讨论】:

  • 如果以下答案之一解决了您的问题,请将其标记为已接受。

标签: spring-boot apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream


【解决方案1】:

抑制中间结果的任何替代方法都会有所帮助。

在早期版本的 Kafka Streams 中没有提供与 recently introduced Suppress feature 相同的行为的等效功能。

您可以获得的最接近的是configure your Kafka Streams application 的记录缓存(设置如cache.max.bytes.buffering)和commit.interval.ms减少您将看到的“中间”更新的数量。但与新的抑制功能不同,这不会完全删除任何此类更新。

【讨论】:

    【解决方案2】:

    您可以覆盖 kafka-clients 和 kafka-streams 版本,如appendix to the Spring for Apache Kafka 参考手册中所述。

    如果您没有在测试中使用嵌入式 kafka 代理,则只需覆盖 kafka 客户端和流。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.1.1</version>
    </dependency>
    

    【讨论】:

    • 从 Spring Boot 方面看,这不是一种反模式吗?
    • 一般来说,是的,但是 Kafka 的发布节奏比 Boot 更频繁,所以我们(Apache Kafka 的 Spring)尽最大努力保持兼容性,除非有破坏性的 API。 IIRC,我认为您不能使用 Boot 2.1 升级到 2.2.x 或更高版本,但您应该可以使用 2.1.1。另一方面,您可以升级到 Boot 2.2.x 和 Hoxton.RELEASE 以获得最新最好的 (kafka 2.3)。
    猜你喜欢
    • 1970-01-01
    • 2016-12-20
    • 1970-01-01
    • 1970-01-01
    • 2019-10-15
    • 1970-01-01
    • 1970-01-01
    • 2022-10-25
    相关资源
    最近更新 更多